Skip to main content

如何使用MQTT协议和Azure IoT Hub通讯

分类:  Azure物联网 标签:  #Azure #IoT Hub # 发布于: 2023-06-13 21:09:59

Azure IoT Hub目前正式支持的是MQTT v3.1.1协议,针对于MQTT 5.0的支持在本文写作时还是public preview的阶段,同时需要注意的是Azure IoT Hub对于MQTT的实现,并没有全部实现协议本身的说明。

  • Azure IoT Hub使用端口8883支持MQTT v3.1.1
  • Azure IoT Hub使用端口443支持WebSocket之上的MQTT
  • 设备和Azure IoT Hub相连必须使用TLS/SSL加密,因此不支持非加密端口1883

链接到Azure IoT Hub

使用MQTT协议的设备可以使用如下两种方式链接到Azure IoT Hub:

  • 使用Azure提供的SDK
  • 直接使用MQTT协议。

大多数的公司会阻挡端口8883, 如果企业或者组织无法开放该端口,推荐用户使用WebSocket之上的MQTT协议,这个时候监听在443端口,大多数的企业或者公司都支持开放该端口。

使用Device SDKs

目前支持MQTT协议的设备SDK,包括JavaNode.jsC#Python, SDK使用标准的链接字符串链接到IoT Hub, 如果要使用MQTT协议链接到Azure IoT Hub需要指定参数,您可以参考下述表格来看看参数如何定义,另外需要注意的是SDK链接到Azure IoT Hub会设置两个重要的属性:

  • CleanSession, 默认是0,表示清除会话。
  • QoS: 默认是1, 参考一下MQTT协议的说明,该参数是消息传送机制:0, 表示最多传一次。1 至少传一次,2表示确保只传一次。
LanguageMQTT protocol parameterMQTT over Web Sockets protocol parameter
Node.jsazure-iot-device-mqtt.Mqttazure-iot-device-mqtt.MqttWs
JavaIotHubClientProtocol.MQTTIotHubClientProtocol.MQTT_WS
CMQTT_ProtocolMQTT_WebSocket_Protocol
C#TransportType.MqttTransportType.Mqtt falls back to MQTT over Web Sockets if MQTT fails. To specify MQTT over Web Sockets only, use TransportType.Mqtt_WebSocket_Only
PythonSupports MQTT by defaultAdd websockets=True in the call to create the client

一些实例:

Node.js:

var Client = require('azure-iot-device').Client;
var Protocol = require('azure-iot-device-mqtt').MqttWs;
var client = Client.fromConnectionString(deviceConnectionString, Protocol);

Python:

from azure.iot.device.aio import IoTHubDeviceClient
device_client = IoTHubDeviceClient.create_from_connection_string(deviceConnectionString, websockets=True)

默认的keep-alive timeout

为了保持设备和IoT Hub之间的链接一直处于活动状态,IoT Hub和设备之间需要互相发送keep-alive ping以保证链接不会被终端。设备端使用SDK来发送,默认情况如下:

LanguageDefault keep-alive intervalConfigurable
Node.js180 secondsNo
Java230 secondsNo
C240 secondsYes
C#300 secondsYes
Python60 secondsNo

至于IoT Hub向设备端发送的keep-alive ping的时间间隔,按照MQTT的标准来发送,理论上它的时间间隔应该是设备端设定的1.5倍。 不过由于Azure上所有的服务建立的链接都要收到Azure Load Balance对于idle链接处理的限制,默认情况下Azure Load Balance最大运行的idle时间是39.45分钟,也就是1767秒,因此IoT hub向设备端最大的时间是29.45分钟。

我们这里举一个例子,如何计算实际的设备不在线的时间:

假如你使用java SDK 向IoT Hub 发送keep-alive的ping, 当设备网络链接断开之后230秒,设备没有发送keep-alive的ping到IoT Hub, 因为设备不在线了,但是实际上IoT Hub并没有立即认为设备不在线,它需要等待:(230*1.5) - 230 = 115秒,然后再端口设备,并报错:4040104。

另外设备端最大能设置的keep-alive时间是:1767/1.5 = 1177秒,需要注意的是keep-alive就是idle了,任何其他的流量会立即重置keep-alive

AMQP迁移到MQTT

一些需要注意的事项:

  • AMQP支持更多的条件报错,使用MQTT之后可能需要添加更多的逻辑。
  • MQTT不支持reject,因此应答场景用Direct method方法吧。
  • Python不支持AMQP协议。

直接使用MQTT协议

如果设备上无法使用Azure SDK, 那么还是可以链接到Azure IoT Hub的服务终结点的端口8883上,直接使用MQTT协议。

当直接使用MQTT协议进行链接的时候,CONNECT包,设备侧需要使用如下的值:

  • ClientId使用设备ID(deviceId)
  • Username使用{iothubhostname}/{device_id}/?api-version=2018-06-30, 其中要注意iothubhostname是全域名的IoT Hub名字。
  • Password使用SAS, 格式:SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}

在模块上直接使用MQTT

  • ClientId使用{device_id}/{module_id}
  • Username使用<hubname>.azure-devices.net/{device_id}/{module_id}/?api-version=2018-06-30
  • Password同样使用SAS
  • 要发送遥测数据的话,使用Topicdevices/{device_id}/modules/{module_id}/messages/events/
  • GETPATCHtwin status等等这些和设备一致。

如果需要直接使用MQTT协议链接,必须要使用tls/ssl协议。

下面看一个简单的Python的例子:

from paho.mqtt import client as mqtt
import ssl

path_to_root_cert = "<local path to digicert.cer file>"
device_id = "<device id from device registry>"
sas_token = "<generated SAS token>"
iot_hub_name = "<iot hub name>"


def on_connect(client, userdata, flags, rc):
    print("Device connected with result code: " + str(rc))


def on_disconnect(client, userdata, rc):
    print("Device disconnected with result code: " + str(rc))


def on_publish(client, userdata, mid):
    print("Device sent message")


client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish

client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
                       device_id + "/?api-version=2018-06-30", password=sas_token)

client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None,
               cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)

client.connect(iot_hub_name+".azure-devices.net", port=8883)

client.publish("devices/" + device_id + "/messages/events/", '{"id":123}', qos=1)
client.loop_forever()

其他的动作和使用到的TOPIC

  • Sending device-to-cloud messages: devices/{device_id}/messages/events/ or devices/{device_id}/messages/events/{property_bag}
  • Receiving cloud-to-device messages: devices/{device_id}/messages/devicebound/# as a Topic Filter
  • Retrieving a device twin's properties: $iothub/twin/res/#
  • Respond to a direct method: ParseError: KaTeX parse error: Expected 'EOF', got '#' at position 21: …b/methods/POST/#̲, 以及:iothub/methods/POST/{method name}/?$rid={request id}

更详细的内容,请仔细参考文档:https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support