如何使用AMQP协议和Azure IoT Hub通讯

分类:  Azure物联网 标签:  #Azure #IoT Hub # 发布于: 2023-06-13 21:07:58

AMQP协议不仅可以使用在设备端,也可以使用在服务端,MQTT只能使用在设备端。Azure IoT Hub支持AMQP 1.0。除了大家可以直接使用Azure SDK之外,你可以直接使用AMQP client通过Azure IoT Hub提供的两个endpoint完成同样的操作。

Service Client

通过认证链接到Azure IoT Hub

为了链接到Azure IoT Hub, 客户端可是使用CBS或者SASL认证。Service Client要求如下的信息:

  • IoT Hub Hostname: 例如:<iot-hub-name>
  • Key Name: service
  • Access Key: 主要或者是次要的服务key
  • SAS Key: 在Azure Portal上生成一个:SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}

下面的代码演示了如何通过uAMQP Python库来链接到IoT Hub:

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)


Service Client使用如下的link来发送C2D消息和接受反馈:

Created byLink typeLink pathDescription
ServiceSender link/messages/deviceboundCloud-to-device messages that are destined for devices are sent to this link by the service. Messages sent over this link have their To property set to the target device's receiver link path, /devices/
ServiceReceiver link/messages/serviceBound/feedback Completion, rejection, and abandonment feedback messages that come from devices received on this link by service. For more information about feedback messages, see Send cloud-to-device messages from an IoT hub.


默认情况Azure IoT Hub使用内置的Event hub来存储遥测数据,但是service client可以使用AMQP协议来获取遥测数据,步骤如下:

  • 链接登录
  • 拿到consume group以及partition id, 从如下类似路径读取:/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (the default consumer group is $Default).


import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here:
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI

def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    return source_uri

receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Device Client

Device client这一侧和service client登录上基本是一致的。


import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)



# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(
        print('\tcorrelation_id:         ' +
        print('\tcontent_type:           ' + str(
        print('\treply_to_group_id:      ' +
        print('\tsubject:                ' + str(
        print('\tuser_id:                ' + str(
        print('\tgroup_sequence:         ' +
        print('\tcontent_encoding:       ' +
        print('\treply_to:               ' + str(
        print('\tabsolute_expiry_time:   ' +
        print('\tgroup_id:               ' + str(

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +


# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = None
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

AMQP 1.0协议可以参考: