如何使用AMQP协议和Azure IoT Hub通讯
分类: Azure物联网 ◆ 标签: #Azure #IoT Hub # ◆ 发布于: 2023-06-13 21:07:58

AMQP
协议不仅可以使用在设备端,也可以使用在服务端,MQTT
只能使用在设备端。Azure IoT Hub
支持AMQP 1.0
。除了大家可以直接使用Azure SDK
之外,你可以直接使用AMQP client
通过Azure IoT Hub
提供的两个endpoint完成同样的操作。
Service Client
通过认证链接到Azure IoT Hub
为了链接到Azure IoT Hub, 客户端可是使用CBS
或者SASL
认证。Service Client
要求如下的信息:
- IoT Hub Hostname: 例如:
<iot-hub-name>.azure-devices.net
- Key Name:
service
- Access Key: 主要或者是次要的服务key
- SAS Key: 在Azure Portal上生成一个:
SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}
下面的代码演示了如何通过uAMQP
Python库来链接到IoT Hub:
import uamqp import urllib import time # Use generate_sas_token implementation available here: # https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure from helper import generate_sas_token iot_hub_name = '<iot-hub-name>' hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name) policy_name = 'service' access_key = '<primary-or-secondary-key>' operation = '<operation-link-name>' # example: '/messages/devicebound' username = '{policy_name}@sas.root.{iot_hub_name}'.format( iot_hub_name=iot_hub_name, policy_name=policy_name) sas_token = generate_sas_token(hostname, access_key, policy_name) uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation) # Create a send or receive client send_client = uamqp.SendClient(uri, debug=True) receive_client = uamqp.ReceiveClient(uri, debug=True)
发送C2D消息
Service Client
使用如下的link
来发送C2D消息和接受反馈:
Created by | Link type | Link path | Description |
---|---|---|---|
Service | Sender link | /messages/devicebound | Cloud-to-device messages that are destined for devices are sent to this link by the service. Messages sent over this link have their To property set to the target device's receiver link path, /devices/ |
Service | Receiver link | /messages/serviceBound/feedback Completion, rejection, and abandonment feedback messages that come from devices received on this link by service. For more information about feedback messages, see Send cloud-to-device messages from an IoT hub. |
接受遥测数据
默认情况Azure IoT Hub
使用内置的Event hub
来存储遥测数据,但是service client可以使用AMQP
协议来获取遥测数据,步骤如下:
- 链接登录
- 拿到consume group以及partition id, 从如下类似路径读取:/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (the default consumer group is $Default).
实例Python
代码:
import json import uamqp import urllib import time # Use the generate_sas_token implementation that's available here: https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure from helper import generate_sas_token iot_hub_name = '<iot-hub-name>' hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name) policy_name = 'service' access_key = '<primary-or-secondary-key>' operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format( consumer_group='$Default', p_id=0) username = '{policy_name}@sas.root.{iot_hub_name}'.format( policy_name=policy_name, iot_hub_name=iot_hub_name) sas_token = generate_sas_token(hostname, access_key, policy_name) uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation) # Optional filtering predicates can be specified by using endpoint_filter # Valid predicates include: # - amqp.annotation.x-opt-sequence-number # - amqp.annotation.x-opt-offset # - amqp.annotation.x-opt-enqueued-time # Set endpoint_filter variable to None if no filter is needed endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995' # Helper function to set the filtering predicate on the source URI def set_endpoint_filter(uri, endpoint_filter=''): source_uri = uamqp.address.Source(uri) source_uri.set_filter(endpoint_filter) return source_uri receive_client = uamqp.ReceiveClient( set_endpoint_filter(uri, endpoint_filter), debug=True) try: batch = receive_client.receive_message_batch(max_batch_size=5) except uamqp.errors.LinkRedirect as redirect: # Once a redirect error is received, close the original client and recreate a new one to the re-directed address receive_client.close() sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key( redirect.address, policy_name, access_key) receive_client = uamqp.ReceiveClient(set_endpoint_filter( redirect.address, endpoint_filter), auth=sas_auth, debug=True) # Start receiving messages in batches batch = receive_client.receive_message_batch(max_batch_size=5) for msg in batch: print('*** received a message ***') print(''.join(msg.get_data())) print('\t: ' + str(msg.annotations['x-opt-sequence-number'])) print('\t: ' + str(msg.annotations['x-opt-offset'])) print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Device Client
在Device client
这一侧和service client
登录上基本是一致的。
登录和认证的话,可以参考如下的代码:
import uamqp import urllib import uuid # Use generate_sas_token implementation available here: # https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure from helper import generate_sas_token iot_hub_name = '<iot-hub-name>' hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name) device_id = '<device-id>' access_key = '<primary-or-secondary-key>' username = '{device_id}@sas.{iot_hub_name}'.format( device_id=device_id, iot_hub_name=iot_hub_name) sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format( hostname=hostname, device_id=device_id), access_key, None) # e.g., '/devices/{device_id}/messages/devicebound' operation = '<operation-link-name>' uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation) receive_client = uamqp.ReceiveClient(uri, debug=True) send_client = uamqp.SendClient(uri, debug=True)
接受C2D消息
直接看代码就好:
# ... # Create a receive client for the cloud-to-device receive link on the device operation = '/devices/{device_id}/messages/devicebound'.format( device_id=device_id) uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation) receive_client = uamqp.ReceiveClient(uri, debug=True) while True: batch = receive_client.receive_message_batch(max_batch_size=5) for msg in batch: print('*** received a message ***') print(''.join(msg.get_data())) # Property 'to' is set to: '/devices/device1/messages/devicebound', print('\tto: ' + str(msg.properties.to)) # Property 'message_id' is set to value provided by the service print('\tmessage_id: ' + str(msg.properties.message_id)) # Other properties are present if they were provided by the service print('\tcreation_time: ' + str(msg.properties.creation_time)) print('\tcorrelation_id: ' + str(msg.properties.correlation_id)) print('\tcontent_type: ' + str(msg.properties.content_type)) print('\treply_to_group_id: ' + str(msg.properties.reply_to_group_id)) print('\tsubject: ' + str(msg.properties.subject)) print('\tuser_id: ' + str(msg.properties.user_id)) print('\tgroup_sequence: ' + str(msg.properties.group_sequence)) print('\tcontent_encoding: ' + str(msg.properties.content_encoding)) print('\treply_to: ' + str(msg.properties.reply_to)) print('\tabsolute_expiry_time: ' + str(msg.properties.absolute_expiry_time)) print('\tgroup_id: ' + str(msg.properties.group_id)) # Message sequence number in the built-in event hub print('\tx-opt-sequence-number: ' + str(msg.annotations['x-opt-sequence-number']))
发送遥测数据(D2C)
# ... # Create a send client for the device-to-cloud send link on the device operation = '/devices/{device_id}/messages/events'.format(device_id=device_id) uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation) send_client = uamqp.SendClient(uri, debug=True) # Set any of the applicable message properties msg_props = uamqp.message.MessageProperties() msg_props.message_id = str(uuid.uuid4()) msg_props.creation_time = None msg_props.correlation_id = None msg_props.content_type = None msg_props.reply_to_group_id = None msg_props.subject = None msg_props.user_id = None msg_props.group_sequence = None msg_props.to = None msg_props.content_encoding = None msg_props.reply_to = None msg_props.absolute_expiry_time = None msg_props.group_id = None # Application properties in the message (if any) application_properties = { "app_property_key": "app_property_value" } # Create message msg_data = b"Your message payload goes here" message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties) send_client.queue_message(message) results = send_client.send_all_messages() for result in results: if result == uamqp.constants.MessageState.SendFailed: print result
注意
AMQP 1.0
协议可以参考:https://www.amqp.org/sites/amqp.org/files/amqp.pdf