Files
jobtrackingapp/JobTrackerApi/Services/SummarizerService.cs
T

379 lines
14 KiB
C#

using System;
using System.Diagnostics;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace JobTrackerApi.Services
{
public sealed record SummarizerMetrics(
bool Healthy,
string? Model,
string? Device,
bool? GpuAvailable,
string? GpuName,
double? HealthLatencyMs,
double? ProbeLatencyMs,
DateTimeOffset? LastProbeAt,
DateTimeOffset? LastProbeSuccessAt,
DateTimeOffset? LastProbeFailureAt,
int ProbeFailures,
int Requests,
int CacheHits,
int CacheMisses,
int Failures,
double? AverageLatencyMs,
DateTimeOffset? LastSuccessAt,
DateTimeOffset? LastFailureAt,
string? LastError
);
public interface ISummarizerService
{
Task<string?> SummarizeAsync(string text, int maxLength = 150, int minLength = 30);
Task<string?> SummarizeSectionAsync(string instruction, string text, int maxLength = 180, int minLength = 40);
Task RunProbeAsync(CancellationToken cancellationToken = default);
Task<SummarizerMetrics> GetMetricsAsync(CancellationToken cancellationToken = default);
}
public class SummarizerService : ISummarizerService
{
private readonly IHttpClientFactory _httpFactory;
private readonly IMemoryCache _cache;
private readonly object _metricsLock = new();
private int _requests;
private int _cacheHits;
private int _cacheMisses;
private int _failures;
private long _totalLatencyTicks;
private DateTimeOffset? _lastSuccessAt;
private DateTimeOffset? _lastFailureAt;
private double? _lastProbeLatencyMs;
private DateTimeOffset? _lastProbeAt;
private DateTimeOffset? _lastProbeSuccessAt;
private DateTimeOffset? _lastProbeFailureAt;
private int _probeFailures;
private string? _lastError;
public SummarizerService(IHttpClientFactory httpFactory, IMemoryCache cache)
{
_httpFactory = httpFactory;
_cache = cache;
}
private static string BuildCacheKey(string text, int maxLength, int minLength)
{
var payload = $"{text}\n::{maxLength}:{minLength}";
var hash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(payload))).ToLowerInvariant();
return $"summ:{hash}";
}
public async Task<string?> SummarizeAsync(string text, int maxLength = 150, int minLength = 30)
{
if (string.IsNullOrWhiteSpace(text)) return null;
return await SummarizeCoreAsync(text, maxLength, minLength);
}
public Task<string?> SummarizeSectionAsync(string instruction, string text, int maxLength = 180, int minLength = 40)
{
if (string.IsNullOrWhiteSpace(instruction) || string.IsNullOrWhiteSpace(text)) return Task.FromResult<string?>(null);
var composed = $"{instruction.Trim()}\n\n{text.Trim()}";
return SummarizeCoreAsync(composed, maxLength, minLength);
}
private async Task<string?> SummarizeCoreAsync(string text, int maxLength, int minLength)
{
// Use a deterministic content hash instead of string.GetHashCode() so cache keys
// are collision-resistant and stable across process restarts.
var key = BuildCacheKey(text, maxLength, minLength);
Interlocked.Increment(ref _requests);
if (_cache.TryGetValue<string>(key, out var cached))
{
Interlocked.Increment(ref _cacheHits);
lock (_metricsLock)
{
_lastSuccessAt = DateTimeOffset.UtcNow;
_lastError = null;
}
return cached;
}
Interlocked.Increment(ref _cacheMisses);
var client = _httpFactory.CreateClient("summarizer");
var payload = JsonSerializer.Serialize(new { text, max_length = maxLength, min_length = minLength });
using var content = new StringContent(payload, Encoding.UTF8, "application/json");
var sw = Stopwatch.StartNew();
try
{
var res = await client.PostAsync("/summarize", content);
sw.Stop();
Interlocked.Add(ref _totalLatencyTicks, sw.ElapsedTicks);
if (!res.IsSuccessStatusCode) return null;
using var stream = await res.Content.ReadAsStreamAsync();
using var doc = await JsonDocument.ParseAsync(stream);
if (doc.RootElement.TryGetProperty("summary", out var el))
{
var s = el.GetString();
if (!string.IsNullOrWhiteSpace(s)) _cache.Set(key, s, TimeSpan.FromHours(6));
lock (_metricsLock)
{
_lastSuccessAt = DateTimeOffset.UtcNow;
_lastError = null;
}
return s;
}
return null;
}
catch (Exception ex)
{
sw.Stop();
Interlocked.Add(ref _totalLatencyTicks, sw.ElapsedTicks);
Interlocked.Increment(ref _failures);
lock (_metricsLock)
{
_lastFailureAt = DateTimeOffset.UtcNow;
_lastError = ex.Message;
}
return null;
}
}
public async Task RunProbeAsync(CancellationToken cancellationToken = default)
{
const string probeText = "Summarizer latency probe for job tracker telemetry.";
var client = _httpFactory.CreateClient("summarizer");
var payload = JsonSerializer.Serialize(new { text = probeText, max_length = 48, min_length = 12 });
using var content = new StringContent(payload, Encoding.UTF8, "application/json");
var sw = Stopwatch.StartNew();
try
{
using var res = await client.PostAsync("/summarize", content, cancellationToken);
sw.Stop();
lock (_metricsLock)
{
_lastProbeAt = DateTimeOffset.UtcNow;
_lastProbeLatencyMs = Math.Round(sw.Elapsed.TotalMilliseconds, 1);
}
if (!res.IsSuccessStatusCode)
{
Interlocked.Increment(ref _probeFailures);
lock (_metricsLock)
{
_lastProbeFailureAt = DateTimeOffset.UtcNow;
_lastError = $"Probe returned {(int)res.StatusCode}.";
}
return;
}
using var stream = await res.Content.ReadAsStreamAsync(cancellationToken);
using var doc = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken);
if (!doc.RootElement.TryGetProperty("summary", out var summaryEl) || string.IsNullOrWhiteSpace(summaryEl.GetString()))
{
Interlocked.Increment(ref _probeFailures);
lock (_metricsLock)
{
_lastProbeFailureAt = DateTimeOffset.UtcNow;
_lastError = "Probe returned an empty summary.";
}
return;
}
lock (_metricsLock)
{
_lastProbeSuccessAt = DateTimeOffset.UtcNow;
_lastError = null;
}
}
catch (Exception ex)
{
sw.Stop();
Interlocked.Increment(ref _probeFailures);
lock (_metricsLock)
{
_lastProbeAt = DateTimeOffset.UtcNow;
_lastProbeLatencyMs = Math.Round(sw.Elapsed.TotalMilliseconds, 1);
_lastProbeFailureAt = DateTimeOffset.UtcNow;
_lastError = ex.Message;
}
}
}
public async Task<SummarizerMetrics> GetMetricsAsync(CancellationToken cancellationToken = default)
{
var client = _httpFactory.CreateClient("summarizer");
string? model = null;
string? device = null;
bool? gpuAvailable = null;
string? gpuName = null;
double? healthLatencyMs = null;
var healthy = false;
string? healthError = null;
try
{
var sw = Stopwatch.StartNew();
using var res = await client.GetAsync("/health", cancellationToken);
sw.Stop();
healthLatencyMs = Math.Round(sw.Elapsed.TotalMilliseconds, 1);
healthy = res.IsSuccessStatusCode;
if (healthy)
{
using var stream = await res.Content.ReadAsStreamAsync(cancellationToken);
using var doc = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken);
if (doc.RootElement.TryGetProperty("model", out var modelEl))
{
model = modelEl.GetString();
}
if (doc.RootElement.TryGetProperty("device", out var deviceEl))
{
device = deviceEl.GetString();
}
if (doc.RootElement.TryGetProperty("gpu_available", out var gpuAvailableEl) && gpuAvailableEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
{
gpuAvailable = gpuAvailableEl.GetBoolean();
}
if (doc.RootElement.TryGetProperty("gpu_name", out var gpuNameEl))
{
gpuName = gpuNameEl.GetString();
}
}
else
{
healthError = $"Health check returned {(int)res.StatusCode}.";
}
}
catch (Exception ex)
{
healthError = ex.Message;
}
var requests = Volatile.Read(ref _requests);
var cacheHits = Volatile.Read(ref _cacheHits);
var cacheMisses = Volatile.Read(ref _cacheMisses);
var failures = Volatile.Read(ref _failures);
var totalLatencyTicks = Volatile.Read(ref _totalLatencyTicks);
DateTimeOffset? lastSuccessAt;
DateTimeOffset? lastFailureAt;
double? probeLatencyMs;
DateTimeOffset? lastProbeAt;
DateTimeOffset? lastProbeSuccessAt;
DateTimeOffset? lastProbeFailureAt;
string? lastError;
lock (_metricsLock)
{
lastSuccessAt = _lastSuccessAt;
lastFailureAt = _lastFailureAt;
probeLatencyMs = _lastProbeLatencyMs;
lastProbeAt = _lastProbeAt;
lastProbeSuccessAt = _lastProbeSuccessAt;
lastProbeFailureAt = _lastProbeFailureAt;
lastError = _lastError;
}
if (!healthy && !string.IsNullOrWhiteSpace(healthError))
{
lastError = healthError;
}
double? averageLatencyMs = requests > 0
? Math.Round(TimeSpan.FromTicks(totalLatencyTicks).TotalMilliseconds / requests, 1)
: null;
return new SummarizerMetrics(
Healthy: healthy,
Model: model,
Device: device,
GpuAvailable: gpuAvailable,
GpuName: gpuName,
HealthLatencyMs: healthLatencyMs,
ProbeLatencyMs: probeLatencyMs,
LastProbeAt: lastProbeAt,
LastProbeSuccessAt: lastProbeSuccessAt,
LastProbeFailureAt: lastProbeFailureAt,
ProbeFailures: Volatile.Read(ref _probeFailures),
Requests: requests,
CacheHits: cacheHits,
CacheMisses: cacheMisses,
Failures: failures,
AverageLatencyMs: averageLatencyMs,
LastSuccessAt: lastSuccessAt,
LastFailureAt: lastFailureAt,
LastError: lastError
);
}
}
public sealed class SummarizerProbeHostedService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<SummarizerProbeHostedService> _logger;
private readonly IConfiguration _cfg;
public SummarizerProbeHostedService(IServiceScopeFactory scopeFactory, ILogger<SummarizerProbeHostedService> logger, IConfiguration cfg)
{
_scopeFactory = scopeFactory;
_logger = logger;
_cfg = cfg;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var enabled = _cfg.GetValue("Summarizer:ProbeEnabled", true);
if (!enabled)
{
return;
}
var intervalSeconds = Math.Clamp(_cfg.GetValue("Summarizer:ProbeIntervalSeconds", 300), 30, 3600);
var initialDelaySeconds = Math.Clamp(_cfg.GetValue("Summarizer:ProbeInitialDelaySeconds", 15), 0, 600);
if (initialDelaySeconds > 0)
{
await Task.Delay(TimeSpan.FromSeconds(initialDelaySeconds), stoppingToken);
}
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(intervalSeconds));
do
{
try
{
using var scope = _scopeFactory.CreateScope();
var summarizer = scope.ServiceProvider.GetRequiredService<ISummarizerService>();
await summarizer.RunProbeAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Summarizer latency probe failed.");
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
}
}
}