使用Host模型创建一个基于队列的服务
分类: .Net技术 ◆ 标签: #.Net Host #.Net #基础 ◆ 发布于: 2023-06-15 15:38:40

从本节开始我们使用HOST
模型创建一些实例。
队列服务是一个很好的例子,每次一个新的任务会被添加到队列中,然后队列的任务会依次被执行。
为了理解整个实例,我们使用如下的图来描述各类之间的关系:
同时对我们即将创建的类作如下的说明:
- 接口
IBackgroundTaskQueue
: 用于定义后台使用的队列模型,该接口提供两个方法:QueueBackgroundWorkItemAsync
, 用于向队列添加项目,DequeueAsync
,用于从队列中取出项目 - 类
DefaultBackgroundTaskQueue
, 实现了接口IBackgroundTaskQueue
, 同时可以注册到依赖注入容器中,用于向需要队列服务的代码提供队列服务。该服务会被注册为Singleton
模式。 - 类
QueuedHostedService
: 该类从BackgroundService
继承,并且会通过扩展方法注册到Host
里,由Host
开始运行该服务。这个也是用户开发基于Host
模型的入口。 - 类
MonitorLoop
: 该类实现了监听键盘事件,同时注入Queue, 并根据键盘的输入向队列中添加新的task.
接下来我们开始创建这个项目。
创建项目
请使用如下的命令行来创建项目:
dotnet new worker -o QueueService
cd QueueService
code .
至此完成了项目的创建。
实现提供队列的服务
使用VS code
在根目录下创建一个新文件,并命名:IBackgroundTaskQueue.cs
, 并使用如下的内容:
namespace QueueService; public interface IBackgroundTaskQueue { ValueTask QueueBackgroundWorkItemAsync( Func<CancellationToken, ValueTask> workItem); ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync( CancellationToken cancellationToken); }
然后定义一个该接口的实现:DefaultBackgroundTaskQueue.cs
:
using System.Threading.Channels; namespace QueueService; public class DefaultBackgroundTaskQueue : IBackgroundTaskQueue { private readonly Channel<Func<CancellationToken, ValueTask>> _queue; public DefaultBackgroundTaskQueue(int capacity) { BoundedChannelOptions options = new(capacity) { FullMode = BoundedChannelFullMode.Wait }; _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options); } public async ValueTask QueueBackgroundWorkItemAsync( Func<CancellationToken, ValueTask> workItem) { if (workItem is null) { throw new ArgumentNullException(nameof(workItem)); } await _queue.Writer.WriteAsync(workItem); } public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync( CancellationToken cancellationToken) { Func<CancellationToken, ValueTask>? workItem = await _queue.Reader.ReadAsync(cancellationToken); return workItem; } }
定义后台任务服务
我们这里使用BackgroundService
作为父类定义类:QueuedHostedService.cs
:
namespace QueueService; public sealed class QueuedHostedService : BackgroundService { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<QueuedHostedService> _logger; public QueuedHostedService( IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger) => (_taskQueue, _logger) = (taskQueue, logger); protected override Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( $"{nameof(QueuedHostedService)} is running.{Environment.NewLine}" + $"{Environment.NewLine}Tap W to add a work item to the " + $"background queue.{Environment.NewLine}"); return ProcessTaskQueueAsync(stoppingToken); } private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { Func<CancellationToken, ValueTask>? workItem = await _taskQueue.DequeueAsync(stoppingToken); await workItem(stoppingToken); } catch (OperationCanceledException) { // Prevent throwing if stoppingToken was signaled } catch (Exception ex) { _logger.LogError(ex, "Error occurred executing task work item."); } } } public override async Task StopAsync(CancellationToken stoppingToken) { _logger.LogInformation( $"{nameof(QueuedHostedService)} is stopping."); await base.StopAsync(stoppingToken); } }
需要注意的是我们这里定义的是从父类BackgroundService
继承的。
创建MonitorLoop
类用于监听键盘和压入任务
使用VS Code 创建文件MonitorLoop.cs
:
namespace QueueService; public class MonitorLoop { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<MonitorLoop> _logger; private readonly CancellationToken _cancellationToken; public MonitorLoop( IBackgroundTaskQueue taskQueue, ILogger<MonitorLoop> logger, IHostApplicationLifetime applicationLifetime) { _taskQueue = taskQueue; _logger = logger; _cancellationToken = applicationLifetime.ApplicationStopping; } public void StartMonitorLoop() { _logger.LogInformation($"{nameof(MonitorAsync)} loop is starting."); // Run a console user input loop in a background thread Task.Run(async () => await MonitorAsync()); } private async ValueTask MonitorAsync() { while (!_cancellationToken.IsCancellationRequested) { var keyStroke = Console.ReadKey(); if (keyStroke.Key == ConsoleKey.W) { // Enqueue a background work item await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync); } } } private async ValueTask BuildWorkItemAsync(CancellationToken token) { // Simulate three 5-second tasks to complete // for each enqueued work item int delayLoop = 0; var guid = Guid.NewGuid(); _logger.LogInformation("Queued work item {Guid} is starting.", guid); while (!token.IsCancellationRequested && delayLoop < 3) { try { await Task.Delay(TimeSpan.FromSeconds(5), token); } catch (OperationCanceledException) { // Prevent throwing if the Delay is cancelled } ++ delayLoop; _logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop); } string format = delayLoop switch { 3 => "Queued Background Task {Guid} is complete.", _ => "Queued Background Task {Guid} was cancelled." }; _logger.LogInformation(format, guid); } }
更改Program.cs
现在我们更改一下Program.cs
文件,设置Host
,并向Host
里注册服务,和启动服务.
using QueueService; using IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices((context, services) => { services.AddSingleton<MonitorLoop>(); services.AddHostedService<QueuedHostedService>(); services.AddSingleton<IBackgroundTaskQueue>(_ => { if (!int.TryParse(context.Configuration["QueueCapacity"], out var queueCapacity)) { queueCapacity = 100; } return new DefaultBackgroundTaskQueue(queueCapacity); }); }) .Build(); MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!; monitorLoop.StartMonitorLoop(); await host.RunAsync();
运行测试
在当前的项目目录下运行dotnet run
:
如下是运行结果,只需要在键盘上按"w"键,即可观察到在向队列中添加任务。