设计一个拥有自动重连的Azure IoT客户端(.Net Device SDK)
分类: Azure物联网 ◆ 标签: #Azure #IoT Hub # ◆ 发布于: 2023-06-13 21:03:09

本章我们根据github上的实例来学习一遍如何设计一个良好的设备客户端,该客户端设计上考虑了:
- 监听设备的连接状态,根据设备的连接状态决定要如何对设备连接进行处理
- 一个设备上只有一个设备客户端实例在线。
- 响应必要的动作
注意
本节代码都是基于.Net 6
。
创建新项目
使用如下的命令创建一个新的项目:
dotnet new console -o ReconnectDeviceSample
cd ReconnectDeviceSample
dotnet add package Microsoft.Azure.Devices.Client
dotnet add package Microsoft.Extensions.Logging.Console
code .
使用VS Code
打开项目之后,新建一个文件GlobaUsing.cs
, 添加如下的内容:
global using Microsoft.Extensions.Logging; global using Microsoft.Azure.Devices.Client.Exceptions; global using Microsoft.Azure.Devices.Shared; global using DotNetty.Transport.Channels; global using System.Net; global using System.Net.Http; global using System.Net.Sockets; global using System.Net.WebSockets; global using System.Runtime.InteropServices; global using System.Security.Authentication; global using ReconnectDeviceSample; global using Microsoft.Azure.Devices.Client; global using System.Text.Encodings; global using Microsoft.Extensions.Logging.Console;
我们还是按照我们之前的程序的写法,先添加一个主要的类,用于演示类:DeviceReconnectionSample.cs
:
在这个类里我们还是使用.Net 6
的新特性:File-scope namespace
, 我们定义该类的基本框架:
namespace ReconnectDeviceSample; public class DeviceReconnectionSample { }
然后我们在来先考虑这个类需要使用到的一些字段或者属性:
- 我们这个类是设备类,还是和之前一样定义一个温度探测的数据,主要用于演示。(private const int TemperatureThreshold = 30;)
- 我们需要针对
AMQP
协议做某些不一样的处理,特别是在接受消息的时候,所以定义一个协议列表。 - 更需要注意的是这个设备只有一个device id, 因此该设备和IoT Hub之间的连接最好就只能有一个,我们可以考虑使用信号量来保证使用设备客户端的时候仅仅只有一个设备客户端。
- 同时我们需要忽略一些异常,例如认证失败的异常,这个异常会通过连接状态变化得到通知。
- 同时由于需要定义设备客户端,为静态。设备的状态也是。
如下是所有的字段或者属性的定义:
private const int TemperatureThreshold = 30; private static readonly TransportType[] _amqpTransports = new[] { TransportType.Amqp, TransportType.Amqp_Tcp_Only, TransportType.Amqp_WebSocket_Only }; private static readonly Random s_randomGenerator = new Random(); private static readonly TimeSpan s_sleepDuration = TimeSpan.FromSeconds(5); private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); private readonly List<string> _deviceConnectionStrings; private readonly TransportType _transportType; private readonly ClientOptions _clientOptions = new ClientOptions { SdkAssignsMessageId = SdkAssignsMessageId.WhenUnset }; // An UnauthorizedException is handled in the connection status change handler through its corresponding status change event. // We will ignore this exception when thrown by the client API operation. private readonly Dictionary<Type, string> _exceptionsToBeIgnored = new Dictionary<Type, string> { { typeof(UnauthorizedException), "Unauthorized exceptions are handled by the ConnectionStatusChangeHandler." } }; private readonly ILogger _logger; // Mark these fields as volatile so that their latest values are referenced. private static volatile DeviceClient s_deviceClient; private static volatile ConnectionStatus s_connectionStatus = ConnectionStatus.Disconnected; private static CancellationTokenSource s_cancellationTokenSource; //当前设备是否已经连接上了 private bool IsDeviceConnected => s_connectionStatus == ConnectionStatus.Connected;
定义构造函数:
主要判断下是不是有足够多的连接字符串,例如可以使用上Primary
和Secondly
public DeviceReconnectionSample(List<string> deviceConnectionStrings, TransportType transportType, ILogger logger) { _logger = logger; // This class takes a list of potentially valid connection strings (most likely the currently known good primary and secondary keys) // and will attempt to connect with the first. If it receives feedback that a connection string is invalid, it will discard it, and // if any more are remaining, will try the next one. // To test this, either pass an invalid connection string as the first one, or rotate it while the sample is running, and wait about // 5 minutes. if (deviceConnectionStrings == null || !deviceConnectionStrings.Any()) { throw new ArgumentException("At least one connection string must be provided.", nameof(deviceConnectionStrings)); } _deviceConnectionStrings = deviceConnectionStrings; _logger.LogInformation($"Supplied with {_deviceConnectionStrings.Count} connection string(s)."); _transportType = transportType; _logger.LogInformation($"Using {_transportType} transport."); }
定义运行本实例的方法,这个和之前是一致的:
public async Task RunSampleAsync(TimeSpan sampleRunningTime) { s_cancellationTokenSource = new CancellationTokenSource(sampleRunningTime); Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; s_cancellationTokenSource.Cancel(); _logger.LogInformation("Sample execution cancellation requested; will exit."); }; _logger.LogInformation($"Sample execution started, press Control+C to quit the sample."); try { await InitializeAndSetupClientAsync(s_cancellationTokenSource.Token); await Task.WhenAll(SendMessagesAsync(s_cancellationTokenSource.Token), ReceiveMessagesAsync(s_cancellationTokenSource.Token)); } catch (Exception ex) { _logger.LogError($"Unrecoverable exception caught, user action is required, so exiting: \n{ex}"); s_cancellationTokenSource.Cancel(); } _initSemaphore.Dispose(); s_cancellationTokenSource.Dispose(); }
需要注意的是:
- 我们定义了一个
Cancel
的事件源,由Control - C
来控制。 - 定义了初始化设备客户端的方法
- 使用Task.WhenAll要求发送遥测数据方法以及,接受消息方法都完成。
- 使用信号量来控制访问设备的初始化。
我们先来看初始化设备客户端的方法定义:
需要注意我们这里有不少辅助方法和辅助类:
ShouldClientBeInitialized
方法用于判断是否应该初始化客户端,我们之前学习过了什么状态下需要初始化。DeviceClient
的初始化除了从连接字符串创建之外,同时也要openAsync表示打开连接。- 使用了方法
s_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangeHandler);
来监控连接状态发生变化。 - 使用了辅助类
RetryOperationHelper.RetryTransientExceptionsAsync
来进行重试 - 使用了信号量保证同一时间只有一个在初始化设备客户端。
private async Task InitializeAndSetupClientAsync(CancellationToken cancellationToken) { if (ShouldClientBeInitialized(s_connectionStatus)) { // Allow a single thread to dispose and initialize the client instance. await _initSemaphore.WaitAsync(); try { if (ShouldClientBeInitialized(s_connectionStatus)) { _logger.LogDebug($"Attempting to initialize the client instance, current status={s_connectionStatus}"); // If the device client instance has been previously initialized, then dispose it. if (s_deviceClient != null) { await s_deviceClient.CloseAsync(); s_deviceClient.Dispose(); s_deviceClient = null; } s_deviceClient = DeviceClient.CreateFromConnectionString(_deviceConnectionStrings.First(), _transportType, _clientOptions); s_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangeHandler); _logger.LogDebug("Initialized the client instance."); } } finally { _initSemaphore.Release(); } // Force connection now. // We have set the "shouldExecuteOperation" function to always try to open the connection. // OpenAsync() is an idempotent call, it has the same effect if called once or multiple times on the same client. await RetryOperationHelper.RetryTransientExceptionsAsync( operationName: "OpenConnection", asyncOperation: async () => await s_deviceClient.OpenAsync(cancellationToken), shouldExecuteOperation: () => true, logger: _logger, exceptionsToBeIgnored: _exceptionsToBeIgnored, cancellationToken: cancellationToken); _logger.LogDebug($"The client instance has been opened."); // You will need to subscribe to the client callbacks any time the client is initialized. await RetryOperationHelper.RetryTransientExceptionsAsync( operationName: "SubscribeTwinUpdates", asyncOperation: async () => await s_deviceClient.SetDesiredPropertyUpdateCallbackAsync(HandleTwinUpdateNotificationsAsync, cancellationToken), shouldExecuteOperation: () => IsDeviceConnected, logger: _logger, exceptionsToBeIgnored: _exceptionsToBeIgnored, cancellationToken: cancellationToken); _logger.LogDebug("The client has subscribed to desired property update notifications."); } }
下面我们先来定义方法ShouldClientBeInitialized
private bool ShouldClientBeInitialized(ConnectionStatus connectionStatus) { return (connectionStatus == ConnectionStatus.Disconnected || connectionStatus == ConnectionStatus.Disabled) && _deviceConnectionStrings.Any(); }
然后定义用于监控连接变化的方法:
// It is not good practice to have async void methods, however, DeviceClient.SetConnectionStatusChangesHandler() event handler signature has a void return type. // As a result, any operation within this block will be executed unmonitored on another thread. // To prevent multi-threaded synchronization issues, the async method InitializeClientAsync being called in here first grabs a lock // before attempting to initialize or dispose the device client instance. private async void ConnectionStatusChangeHandler(ConnectionStatus status, ConnectionStatusChangeReason reason) { _logger.LogDebug($"Connection status changed: status={status}, reason={reason}"); s_connectionStatus = status; switch (status) { case ConnectionStatus.Connected: _logger.LogDebug("### The DeviceClient is CONNECTED; all operations will be carried out as normal."); break; case ConnectionStatus.Disconnected_Retrying: _logger.LogDebug("### The DeviceClient is retrying based on the retry policy. Do NOT close or open the DeviceClient instance"); break; case ConnectionStatus.Disabled: _logger.LogDebug("### The DeviceClient has been closed gracefully." + "\nIf you want to perform more operations on the device client, you should dispose (DisposeAsync()) and then open (OpenAsync()) the client."); break; case ConnectionStatus.Disconnected: switch (reason) { case ConnectionStatusChangeReason.Bad_Credential: // When getting this reason, the current connection string being used is not valid. // If we had a backup, we can try using that. _deviceConnectionStrings.RemoveAt(0); if (_deviceConnectionStrings.Any()) { _logger.LogWarning($"The current connection string is invalid. Trying another."); await InitializeAndSetupClientAsync(s_cancellationTokenSource.Token); break; } _logger.LogWarning("### The supplied credentials are invalid. Update the parameters and run again."); s_cancellationTokenSource.Cancel(); break; case ConnectionStatusChangeReason.Device_Disabled: _logger.LogWarning("### The device has been deleted or marked as disabled (on your hub instance)." + "\nFix the device status in Azure and then create a new device client instance."); s_cancellationTokenSource.Cancel(); break; case ConnectionStatusChangeReason.Retry_Expired: _logger.LogWarning("### The DeviceClient has been disconnected because the retry policy expired." + "\nIf you want to perform more operations on the device client, you should dispose (DisposeAsync()) and then open (OpenAsync()) the client."); await InitializeAndSetupClientAsync(s_cancellationTokenSource.Token); break; case ConnectionStatusChangeReason.Communication_Error: _logger.LogWarning("### The DeviceClient has been disconnected due to a non-retry-able exception. Inspect the exception for details." + "\nIf you want to perform more operations on the device client, you should dispose (DisposeAsync()) and then open (OpenAsync()) the client."); await InitializeAndSetupClientAsync(s_cancellationTokenSource.Token); break; default: _logger.LogError("### This combination of ConnectionStatus and ConnectionStatusChangeReason is not expected, contact the client library team with logs."); break; } break; default: _logger.LogError("### This combination of ConnectionStatus and ConnectionStatusChangeReason is not expected, contact the client library team with logs."); break; } }
然后定义消息处理的方法:
private async Task HandleTwinUpdateNotificationsAsync(TwinCollection twinUpdateRequest, object userContext) { CancellationToken cancellationToken = (CancellationToken)userContext; if (!cancellationToken.IsCancellationRequested) { var reportedProperties = new TwinCollection(); _logger.LogInformation($"Twin property update requested: \n{twinUpdateRequest.ToJson()}"); // For the purpose of this sample, we'll blindly accept all twin property write requests. foreach (KeyValuePair<string, object> desiredProperty in twinUpdateRequest) { _logger.LogInformation($"Setting property {desiredProperty.Key} to {desiredProperty.Value}."); reportedProperties[desiredProperty.Key] = desiredProperty.Value; } // For the purpose of this sample, we'll blindly accept all twin property write requests. await RetryOperationHelper.RetryTransientExceptionsAsync( operationName: "UpdateReportedProperties", asyncOperation: async () => await s_deviceClient.UpdateReportedPropertiesAsync(reportedProperties, cancellationToken), shouldExecuteOperation: () => IsDeviceConnected, logger: _logger, exceptionsToBeIgnored: _exceptionsToBeIgnored, cancellationToken: cancellationToken); } } private async Task SendMessagesAsync(CancellationToken cancellationToken) { int messageCount = 0; while (!cancellationToken.IsCancellationRequested) { if (IsDeviceConnected) { _logger.LogInformation($"Device sending message {++messageCount} to IoT hub."); using Message message = PrepareMessage(messageCount); await RetryOperationHelper.RetryTransientExceptionsAsync( operationName: $"SendD2CMessage_{messageCount}", asyncOperation: async () => await s_deviceClient.SendEventAsync(message), shouldExecuteOperation: () => IsDeviceConnected, logger: _logger, exceptionsToBeIgnored: _exceptionsToBeIgnored, cancellationToken: cancellationToken); _logger.LogInformation($"Device sent message {messageCount} to IoT hub."); } await Task.Delay(s_sleepDuration); } } private async Task ReceiveMessagesAsync(CancellationToken cancellationToken) { var c2dReceiveExceptionsToBeIgnored = new Dictionary<Type, string>(_exceptionsToBeIgnored) { { typeof(DeviceMessageLockLostException), "Attempted to complete a received message whose lock token has expired" } }; while (!cancellationToken.IsCancellationRequested) { if (!IsDeviceConnected) { await Task.Delay(s_sleepDuration); continue; } else if (_transportType == TransportType.Http1) { // The call to ReceiveAsync over HTTP completes immediately, rather than waiting up to the specified // time or when a cancellation token is signaled, so if we want it to poll at the same rate, we need // to add an explicit delay here. await Task.Delay(s_sleepDuration); } _logger.LogInformation($"Device waiting for C2D messages from the hub for {s_sleepDuration}." + $"\nUse the IoT Hub Azure Portal or Azure IoT Explorer to send a message to this device."); await RetryOperationHelper.RetryTransientExceptionsAsync( operationName: "ReceiveAndCompleteC2DMessage", asyncOperation: async () => await ReceiveMessageAndCompleteAsync(), shouldExecuteOperation: () => IsDeviceConnected, logger: _logger, exceptionsToBeIgnored: c2dReceiveExceptionsToBeIgnored, cancellationToken: cancellationToken); } } private async Task ReceiveMessageAndCompleteAsync() { using var cts = new CancellationTokenSource(s_sleepDuration); Message receivedMessage = null; try { // AMQP library does not take a cancellation token but does take a time span // so we'll call this API differently. if (_amqpTransports.Contains(_transportType)) { receivedMessage = await s_deviceClient.ReceiveAsync(s_sleepDuration); } else { receivedMessage = await s_deviceClient.ReceiveAsync(cts.Token); } } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { _logger.LogInformation("Timed out waiting to receive a message."); } if (receivedMessage == null) { _logger.LogInformation("No message received."); return; } try { string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes()); var formattedMessage = new StringBuilder($"Received message: [{messageData}]"); foreach (var prop in receivedMessage.Properties) { formattedMessage.AppendLine($"\n\tProperty: key={prop.Key}, value={prop.Value}"); } _logger.LogInformation(formattedMessage.ToString()); await s_deviceClient.CompleteAsync(receivedMessage); _logger.LogInformation($"Completed message [{messageData}]."); } finally { receivedMessage.Dispose(); } } private Message PrepareMessage(int messageId) { var temperature = s_randomGenerator.Next(20, 35); var humidity = s_randomGenerator.Next(60, 80); string messagePayload = $"temperature:{temperature},humidity:{humidity}"; var eventMessage = new Message(Encoding.UTF8.GetBytes(messagePayload)) { MessageId = messageId.ToString(), ContentEncoding = Encoding.UTF8.ToString(), ContentType = "application/json", }; eventMessage.Properties.Add("temperatureAlert", (temperature > TemperatureThreshold) ? "true" : "false"); return eventMessage; }
之前我们看到需要一个重试的类RetryOperationHelper.cs
, 在根目录下创建一个文件, 然后放入如下的内容:
namespace ReconnectDeviceSample; /// </summary> internal class RetryOperationHelper { /// <summary> /// Retry an async operation on encountering a transient operation. The retry strategy followed is an exponential backoff strategy. /// </summary> /// <param name="operationName">An identifier for the async operation to be executed. This is used for debugging purposes.</param> /// <param name="asyncOperation">The async operation to be retried.</param> /// <param name="shouldExecuteOperation">A function that determines if the operation should be executed. /// Eg.: for scenarios when we want to execute the operation only if the client is connected, this would be a function that returns if the client is currently connected.</param> /// <param name="logger">The <see cref="ILogger"/> instance to be used.</param> /// <param name="exceptionsToBeIgnored">An optional list of exceptions that can be ignored.</param> /// <param name="cancellationToken">The cancellation token to cancel the operation.</param> internal static async Task RetryTransientExceptionsAsync( string operationName, Func<Task> asyncOperation, Func<bool> shouldExecuteOperation, ILogger logger, IDictionary<Type, string> exceptionsToBeIgnored = default, CancellationToken cancellationToken = default) { IRetryPolicy retryPolicy = new ExponentialBackoffTransientExceptionRetryPolicy(maxRetryCount: int.MaxValue, exceptionsToBeIgnored: exceptionsToBeIgnored); int attempt = 0; bool shouldRetry; do { Exception lastException = new IotHubCommunicationException("Client is currently reconnecting internally; attempt the operation after some time."); try { if (shouldExecuteOperation()) { logger.LogInformation(FormatRetryOperationLogMessage(operationName, attempt, "executing.")); await asyncOperation(); break; } else { logger.LogWarning(FormatRetryOperationLogMessage(operationName, attempt, "operation is not ready to be executed. Attempt discarded.")); } } catch (Exception ex) { logger.LogWarning(FormatRetryOperationLogMessage(operationName, attempt, $"encountered an exception while processing the request: {ex}")); lastException = ex; } shouldRetry = retryPolicy.ShouldRetry(++attempt, lastException, out TimeSpan retryInterval); if (shouldRetry) { logger.LogWarning(FormatRetryOperationLogMessage(operationName, attempt, $"caught a recoverable exception, will retry in {retryInterval}.")); await Task.Delay(retryInterval); } else { logger.LogWarning(FormatRetryOperationLogMessage(operationName, attempt, $"retry policy determined that the operation should no longer be retried, stopping retries.")); } } while (shouldRetry && !cancellationToken.IsCancellationRequested); } private static string FormatRetryOperationLogMessage(string operationName, int attempt, string logMessage) { return $"Operation name = {operationName}, attempt = {attempt}, status = {logMessage}"; } }
同时需要定义辅助类ExponentialBackoffTransientExceptionRetryPolicy.cs
namespace ReconnectDeviceSample; internal class ExponentialBackoffTransientExceptionRetryPolicy : IRetryPolicy { private static readonly Random s_random = new Random(); private static readonly TimeSpan s_minBackoff = TimeSpan.FromMilliseconds(100); private static readonly TimeSpan s_maxBackoff = TimeSpan.FromSeconds(10); private static readonly TimeSpan s_deltaBackoff = TimeSpan.FromMilliseconds(100); private static int s_maxRetryCount; private static IDictionary<Type, string>? s_exceptionsToBeIgnored; internal ExponentialBackoffTransientExceptionRetryPolicy(int maxRetryCount = default, IDictionary<Type, string>? exceptionsToBeIgnored = default) { s_maxRetryCount = maxRetryCount == 0 ? int.MaxValue : maxRetryCount; s_exceptionsToBeIgnored = exceptionsToBeIgnored; } public bool ShouldRetry(int currentRetryCount, Exception lastException, out TimeSpan retryInterval) { if (currentRetryCount < s_maxRetryCount) { if ((lastException is IotHubException iotHubException && iotHubException.IsTransient) || ExceptionHelper.IsNetworkExceptionChain(lastException) || (s_exceptionsToBeIgnored != null && s_exceptionsToBeIgnored.ContainsKey(lastException.GetType()))) { double exponentialInterval = (Math.Pow(2.0, currentRetryCount) - 1.0) * s_random.Next( (int)s_deltaBackoff.TotalMilliseconds * 8 / 10, (int)s_deltaBackoff.TotalMilliseconds * 12 / 10) + s_minBackoff.TotalMilliseconds; double maxInterval = s_maxBackoff.TotalMilliseconds; double num2 = Math.Min(exponentialInterval, maxInterval); retryInterval = TimeSpan.FromMilliseconds(num2); return true; } } retryInterval = TimeSpan.Zero; return false; } }
以及辅助类ExceptionHelper.cs
namespace ReconnectDeviceSample; // Sample exception handler class - this class should be modified based on your application's logic internal class ExceptionHelper { private static readonly HashSet<Type> s_networkExceptions = new HashSet<Type> { typeof(IOException), typeof(SocketException), typeof(ClosedChannelException), typeof(TimeoutException), typeof(OperationCanceledException), typeof(HttpRequestException), typeof(WebException), typeof(WebSocketException), }; private static bool IsNetwork(Exception singleException) { return s_networkExceptions.Any(baseExceptionType => baseExceptionType.IsInstanceOfType(singleException)); } internal static bool IsNetworkExceptionChain(Exception exceptionChain) { return exceptionChain.Unwind(true).Any(e => IsNetwork(e) && !IsTlsSecurity(e)); } internal static bool IsSecurityExceptionChain(Exception exceptionChain) { return exceptionChain.Unwind(true).Any(e => IsTlsSecurity(e)); } private static bool IsTlsSecurity(Exception singleException) { if (// WinHttpException (0x80072F8F): A security error occurred. (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && (singleException.HResult == unchecked((int)0x80072F8F))) || // CURLE_SSL_CACERT (60): Peer certificate cannot be authenticated with known CA certificates. (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && (singleException.HResult == 60)) || singleException is AuthenticationException) { return true; } return false; } }
异常扩展类ExceptionExtensions.cs
namespace ReconnectDeviceSample; internal static class ExceptionExtensions { internal static IEnumerable<Exception> Unwind(this Exception exception, bool unwindAggregate = false) { while (exception != null) { yield return exception; if (!unwindAggregate) { exception = exception.InnerException!; continue; } if (exception is AggregateException aggEx && aggEx.InnerExceptions != null) { foreach (Exception ex in aggEx.InnerExceptions) { foreach (Exception innerEx in ex.Unwind(true)) { yield return innerEx; } } } exception = exception.InnerException!; } } }
定义完所有的辅助类以及正式的运行类之后,打开Program.cs
, 以如下的内容替换:
Console.WriteLine("使用SAS Key连接字符串连接Azure IoT Hub, 并监听连接状态变化......"); List<string> s_connectionString = new List<string>{ "primary", "secondly" }; // Set up logging using var loggerFactory = LoggerFactory.Create(builder => { builder .AddFilter("Microsoft", LogLevel.Warning) .AddFilter("System", LogLevel.Warning) .AddFilter("LoggingConsoleApp.Program", LogLevel.Debug) .AddConsole(); }); var logger = loggerFactory.CreateLogger<Program>(); var sample = new DeviceReconnectionSample(s_connectionString, TransportType.Mqtt, logger); await sample.RunSampleAsync(Timeout.InfiniteTimeSpan.); logger.LogInformation("Done."); return 0;
请仔细的理解一下这篇代码里的设计,对于避免我们很多中问题都是非常重要的。