refactor, security updates, cv extraction upgrades
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
using System.Threading.Channels;
|
||||
using JobTrackerApi.Controllers;
|
||||
|
||||
namespace JobTrackerApi.Services;
|
||||
|
||||
public interface ICvProcessingQueue
|
||||
{
|
||||
ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default);
|
||||
IAsyncEnumerable<int> DequeueAllAsync(CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
public sealed class CvProcessingQueue : ICvProcessingQueue
|
||||
{
|
||||
private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
|
||||
{
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
});
|
||||
|
||||
public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default)
|
||||
=> _channel.Writer.WriteAsync(runId, cancellationToken);
|
||||
|
||||
public IAsyncEnumerable<int> DequeueAllAsync(CancellationToken cancellationToken)
|
||||
=> _channel.Reader.ReadAllAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public sealed class NoOpCvProcessingQueue : ICvProcessingQueue
|
||||
{
|
||||
public static readonly NoOpCvProcessingQueue Instance = new();
|
||||
public ValueTask EnqueueAsync(int runId, CancellationToken cancellationToken = default) => ValueTask.CompletedTask;
|
||||
public async IAsyncEnumerable<int> DequeueAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class CvProcessingHostedService : BackgroundService
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ICvProcessingQueue _queue;
|
||||
private readonly ILogger<CvProcessingHostedService> _logger;
|
||||
|
||||
public CvProcessingHostedService(IServiceScopeFactory scopeFactory, ICvProcessingQueue queue, ILogger<CvProcessingHostedService> logger)
|
||||
{
|
||||
_scopeFactory = scopeFactory;
|
||||
_queue = queue;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await foreach (var runId in _queue.DequeueAllAsync(stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var controller = scope.ServiceProvider.GetRequiredService<ProfileCvController>();
|
||||
await controller.ProcessQueuedRunAsync(runId, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Unhandled CV processing worker failure for run {RunId}", runId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user