72 lines
2.5 KiB
C#
72 lines
2.5 KiB
C#
using System.Threading.Channels;
|
|
using JobTrackerApi.Controllers;
|
|
|
|
namespace JobTrackerApi.Services;
|
|
|
|
public interface ICvProcessingQueue
|
|
{
|
|
ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default);
|
|
IAsyncEnumerable<int> DequeueAllAsync(CancellationToken cancellationToken);
|
|
}
|
|
|
|
public sealed class CvProcessingQueue : ICvProcessingQueue
|
|
{
|
|
private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
|
|
{
|
|
SingleReader = true,
|
|
SingleWriter = false,
|
|
});
|
|
|
|
public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default)
|
|
=> _channel.Writer.WriteAsync(runId, cancellationToken);
|
|
|
|
public IAsyncEnumerable<int> DequeueAllAsync(CancellationToken cancellationToken)
|
|
=> _channel.Reader.ReadAllAsync(cancellationToken);
|
|
}
|
|
|
|
public sealed class NoOpCvProcessingQueue : ICvProcessingQueue
|
|
{
|
|
public static readonly NoOpCvProcessingQueue Instance = new();
|
|
public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default) => ValueTask.CompletedTask;
|
|
public async IAsyncEnumerable<int> DequeueAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
await Task.CompletedTask;
|
|
yield break;
|
|
}
|
|
}
|
|
|
|
public sealed class CvProcessingHostedService : BackgroundService
|
|
{
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly ICvProcessingQueue _queue;
|
|
private readonly ILogger<CvProcessingHostedService> _logger;
|
|
|
|
public CvProcessingHostedService(IServiceScopeFactory scopeFactory, ICvProcessingQueue queue, ILogger<CvProcessingHostedService> logger)
|
|
{
|
|
_scopeFactory = scopeFactory;
|
|
_queue = queue;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
await foreach (var runId in _queue.DequeueAllAsync(stoppingToken))
|
|
{
|
|
try
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var controller = scope.ServiceProvider.GetRequiredService<ProfileCvController>();
|
|
await controller.ProcessQueuedRunAsync(runId, stoppingToken);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Unhandled CV processing worker failure for run {RunId}", runId);
|
|
}
|
|
}
|
|
}
|
|
}
|