using System.Threading.Channels; using JobTrackerApi.Controllers; namespace JobTrackerApi.Services; public interface ICvProcessingQueue { ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default); IAsyncEnumerable DequeueAllAsync(CancellationToken cancellationToken); } public sealed class CvProcessingQueue : ICvProcessingQueue { private readonly Channel _channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, }); public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default) => _channel.Writer.WriteAsync(runId, cancellationToken); public IAsyncEnumerable DequeueAllAsync(CancellationToken cancellationToken) => _channel.Reader.ReadAllAsync(cancellationToken); } public sealed class NoOpCvProcessingQueue : ICvProcessingQueue { public static readonly NoOpCvProcessingQueue Instance = new(); public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default) => ValueTask.CompletedTask; public async IAsyncEnumerable DequeueAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { await Task.CompletedTask; yield break; } } public sealed class CvProcessingHostedService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly ICvProcessingQueue _queue; private readonly ILogger _logger; public CvProcessingHostedService(IServiceScopeFactory scopeFactory, ICvProcessingQueue queue, ILogger logger) { _scopeFactory = scopeFactory; _queue = queue; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach (var runId in _queue.DequeueAllAsync(stoppingToken)) { try { await using var scope = _scopeFactory.CreateAsyncScope(); var controller = scope.ServiceProvider.GetRequiredService(); await controller.ProcessQueuedRunAsync(runId, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Unhandled CV processing worker failure for run {RunId}", runId); } } } }