Skip to main content

云上如何处理消息

分类:  Azure物联网 标签:  #Azure #IoT Hub # #入门 #指南 发布于: 2023-06-13 22:54:49

我们之前介绍了如何通过设备向云发送消息,这类消息我们称之为Device-To-Clound消息,它的适用场景是设备上的基于时间序列的遥测数据,这些数据的吞吐量可以是非常大,频率也可以是非常高,并且这些数据发送到Azure IoT Hub后,会被暂存在云上,最大可以存储七天,可以在创建Azure IoT Hub的时候指定,或者通过Azure Portal和其他工具进行调整。另外这些数据是根据设备的Id进行分区, 用户无法调整用于分区的key, 必须是设备ID, 但是用户可以调整分区的数量以及Azure IoT Hub的单元数(unit)。

数据存储到Azure IoT Hub之后,我们需要注意到另外一个特点,那就是Azure IoT Hub默认的消息存储底层是兼容Azure Event Hub的,因此如果您需要处理设备发送到云上的数据时,有如下一些方法:

  1. 使用Azure Event HubSDK监听事件,并读取处理消息,关于这一点我们之前的文档里也有一个演示,但是这种处理方法在大数据量场景时不实用。
  2. 使用消息路由(Message Route)将消息自动转发到其他的服务上,然后结合其他的数据存储和分析方法做进一步的处理,可以路由到:
    1. Event Hub
    2. Service Bus queue
    3. Service Bus topic
    4. Azure Storage

我们来学习一下如何配置消息路由。

Message Route

消息路由是根据一定的条件将设备发送到云的消息转发到Azure IoT Hub支持的四种服务之一,创建一个消息路由,包括两步:

  1. 创建一个自定义的终结点
  2. 根据条件创建一个路由指向刚刚创建的自定义终结点。

注意
终结点是指设备将消息发送到云端之后,具体由哪个服务最终接收消息,Azure IoT Hub提供的默认终结点是兼容Event Hub的,用于自定义的终结点是我们之前讨论的四种服务之一,每一个IoT Hub最多配置10个自定义终结点。

我们来演示一下如何配置消息路由,我们选择将消息路由到Azure Storage, 这样做的好处是,Stoarge可以存储大量的数据,而且都是持久化的数量,另外Azure提供的很多分析服务都可以直接使用Azure Storage里的数据,例如Azure DatabricksAzure Synapse, 分析数据之后,还可以直接使用Power BI呈现,非常方便。

  1. 创建Azure Storage容器
    第一步通过Azure Portal找到Azure Stoarge的资源(如果没有请创建一个), 在左侧菜单找到Containers, 然后创建一个新的ContainersMessages, 用于存储路由的消息。

  2. Azure IoT Hub上创建自定义终结
    使用Azure Portal找到Azure IoT Hub的资源之后,从左侧菜单: Hub Settings -> Message routing, 然后选择Custom endpoints:



    选择Add -> Storage



    输入自定义终结点的名字以及选择一个Storage的容器, 跳出的窗口首先选择账号



    然后选择容器:



    选择完成之后回到之前的设置页面:
    两个重要的设置:
    一个是每次批量传送消息的时间间隔,一个是每个文件的最大大小:




    最后还有一些重要的设置:



    如上图,你可以选择存储的文件格式,目前仅仅支持AVROJson, 文件放置的路径格式,以及和Azure Stoarge之间的认证方法。
    最后点击Create按钮就可以完成自定义终结点的创建。

  3. 创建消息路由
    完成了自定义终结点的创建之后,只是完成了第一步,要想完成消息路由,我们还需要明确的创建一个路由路径,该路径指向我们之前创建的自定义终结点。



    如上图我们可以创建一个路由



    然后会跳出一个页面,这个页面用于我们配置我们的消息路由,设置好名字之后,我们可以选择Endpoint, 请仔细看一下上图,我们会发现内置的终结点也在列表中出现,即:Built-in Endpoint -> Event, 当然我们这里选择的是我们自定义的终结点。



    我们这一步需要选择消息的来源,从上述列表中可以看到,除了我们设备发送给云的消息之外,我们还有其他几类由Azure IoT Hub自主产生的消息类型:

    • Device Telemetry Message: 这个就是指我们设备发送给云的消息,我们实例中应该选择这个。
    • Device Lifecycle Events: 这个是由Azure IoT Hub自主产生的消息,用于监控设备的生命周期。
    • Device Connection State Event: 也是Azure IoT Hub自主产生的消息,连接状态的改变。
    • Device Twin Change EventAzure IoT Hub自主产生的消息,Device Twin改变事件。
    • Device Job Lifecycle EventAzure IoT Hub自主产生的消息,Device Job的生命周期监控。

    我们的演示中选择Device Telemetry Message表示设备向云发送的遥测数据。



    最后一个非常重要的部分就是我们需要选择哪些消息路由到我们自定义的终结点上。上图所所以的Routing Query是一个布尔表达式,根据表达式的条件计算的结果为True,那么该消息会被路由,反之则不路由。在我们这个实例中直接填充了一个True, 表示所有的消息都会被路由到我们自定义的总结点上。最后可以点击一下test按钮测试一下路由。

到这里我们介绍了完如何创建一个消息路。

消息路由的查询条件

我们刚刚在创建消息路由的时候,最后一个步骤有一个设置Routing Query的地方,这里我们需要输入一个表达式,针对于每条消息,该表达式的计算结果是true, 才会被路由,如果计算结果为false则不会路由,那么我们如何书写这个表达式呢?

首先我们回顾一下上一章我们讨论了每个D2C消息的格式,它主要包括:

  • 系统属性
  • 应用属性
  • 消息体

每条消息的这个三个部分均可以在表达式里进行应用,并且参加表达式计算。
另外还可以引用设备本身(主要是指Device Twin)的信息参与表达式的计算,例如Device Twintags这部分值,打个比方,例如我们为每个设备设置了一个Device Twin的tags : Location, 我们只需要locationshanghai的消息路由到名字为Shanghai的容器里。

我们接下来学习一下如何在Routing Query引用和计算这些数据。

系统属性

要在查询表达式里引用系统属性,需要使用如下的方式:

$contentEncoding = 'UTF-8' and $contentType = 'application/JSON

引用系统属性必须使用前导 $, 例如要引用系统属性iothub-connection-device-id, 引用名字是:connectionDeviceId

注意
这里回想一下我们上章讨论的系统属性名字和它的引用Id或者名字。也请回想一下我们有多少种系统属性。

应用属性

我们上一章也介绍过应用属性,应用属性直接引用,例如:

"appProperties": { 
      "processingPath": "{cold | warm | hot}", 
      "verbose": "{true, false}", 
      "severity": 1-5, 
      "testDevice": "{true | false}" 
    }, 

引用应用属性:

processingPath = 'cold'

我们合并一下系统属性和应用属性:

$contentEncoding = 'UTF-8' AND processingPath = 'hot'


引用消息体

假设消息体是如下:

  "Weather": {
        "Temperature": 50,
        "Time": "2017-03-09T00:00:00.000Z",
        "PrevTemperatures": [
            20,
            30,
            40
        ],
        "IsEnabled": true,
        "Location": {
            "Street": "One Microsoft Way",
            "City": "Redmond",
            "State": "WA"
        },
        "HistoricalData": [
            {
                "Month": "Feb",
                "Temperature": 40
            },
            {
                "Month": "Jan",
                "Temperature": 30
            }
        ]
    }

应用消息体直接是:

$body.Weather.HistoricalData[0].Month = 'Feb'

或者是:

$body.Weather.Temperature = 50 AND $body.Weather.IsEnabled
length($body.Weather.Location.State) = 2

所以引用消息体必须是使用前导$body

Device Twin

这就简单了,给两个例子就明白了

$twin.properties.desired.telemetryConfig.sendFrequency = '5m'
``
或者是:

```json
$twin.tags.deploymentLocation.floor = 1

至此消息路由我们就讲完了。

不过有需要注意的地方:

  1. 创建了自定义路由之后,消息就只会发送到自定义路由去了,不会再发送到内置的路由上,如果需要消息也仍然发送到内置的路由上,那么需要新创建一个消息路由,指向内置的终结点。
  2. 如果一个消息不能被路由到自定义路由上,而且没有Enable Fallback Route, 那么这个消息会被丢弃,如果Enable Fallback Route, 那么这样的消息会送到内置终结点上。