using System; using Svrnty.CQRS.Events.HealthCheck; using Svrnty.CQRS.Events.Abstractions.Models; using Svrnty.CQRS.Events.Abstractions.EventStore; using Svrnty.CQRS.Events.Abstractions.Streaming; using Svrnty.CQRS.Events.Subscriptions; using Svrnty.CQRS.Events.Abstractions.Configuration; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.HealthCheck; /// /// Implementation of stream health checks. /// public sealed class StreamHealthCheck : IStreamHealthCheck { private readonly IEnumerable _streamConfigurations; private readonly IEnumerable _subscriptions; private readonly IEventStreamStore _streamStore; private readonly ILogger? _logger; private readonly StreamHealthCheckOptions _options; public StreamHealthCheck( IEnumerable streamConfigurations, IEnumerable subscriptions, IEventStreamStore streamStore, IOptions? options = null, ILogger? logger = null) { _streamConfigurations = streamConfigurations ?? Enumerable.Empty(); _subscriptions = subscriptions ?? Enumerable.Empty(); _streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore)); _logger = logger; _options = options?.Value ?? new StreamHealthCheckOptions(); } public async Task CheckStreamHealthAsync(string streamName, CancellationToken cancellationToken = default) { var sw = Stopwatch.StartNew(); try { using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_options.HealthCheckTimeout); var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName); if (streamConfig == null) { return HealthCheckResult.Unhealthy( $"Stream '{streamName}' does not exist", duration: sw.Elapsed); } // Check if we can get the stream length (validates stream is readable) var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token); var data = new Dictionary { ["streamName"] = streamName, ["streamLength"] = streamLength, ["streamType"] = streamConfig.Type.ToString(), ["deliverySemantics"] = streamConfig.DeliverySemantics.ToString(), ["scope"] = streamConfig.Scope.ToString() }; return HealthCheckResult.Healthy( $"Stream '{streamName}' is healthy (length: {streamLength})", data, sw.Elapsed); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (OperationCanceledException) { return HealthCheckResult.Unhealthy( $"Health check for stream '{streamName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s", duration: sw.Elapsed); } catch (Exception ex) { _logger?.LogError(ex, "Error checking health of stream '{StreamName}'", streamName); return HealthCheckResult.Unhealthy( $"Error checking stream '{streamName}': {ex.Message}", ex, duration: sw.Elapsed); } } public async Task CheckSubscriptionHealthAsync(string streamName, string subscriptionName, CancellationToken cancellationToken = default) { var sw = Stopwatch.StartNew(); try { using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_options.HealthCheckTimeout); var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName); if (streamConfig == null) { return HealthCheckResult.Unhealthy( $"Stream '{streamName}' does not exist", duration: sw.Elapsed); } var subscription = _subscriptions.FirstOrDefault(s => s.StreamName == streamName && s.SubscriptionId == subscriptionName); if (subscription == null) { return HealthCheckResult.Unhealthy( $"Subscription '{subscriptionName}' does not exist on stream '{streamName}'", duration: sw.Elapsed); } // Get stream length and consumer offset var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token); var consumerOffset = await _streamStore.GetConsumerOffsetAsync(streamName, subscriptionName, cts.Token); // Calculate lag var lag = streamLength - consumerOffset; // Get last update time to detect stalled consumers var lastUpdateTime = await _streamStore.GetConsumerLastUpdateTimeAsync(streamName, subscriptionName, cts.Token); var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdateTime; var data = new Dictionary { ["streamName"] = streamName, ["subscriptionName"] = subscriptionName, ["streamLength"] = streamLength, ["consumerOffset"] = consumerOffset, ["lag"] = lag, ["lastUpdateTime"] = lastUpdateTime, ["timeSinceUpdate"] = timeSinceUpdate.TotalSeconds, ["subscriptionMode"] = subscription.Mode.ToString() }; // Check for unhealthy conditions if (lag >= _options.UnhealthyConsumerLagThreshold) { return HealthCheckResult.Unhealthy( $"Subscription '{subscriptionName}' has excessive lag: {lag} events (threshold: {_options.UnhealthyConsumerLagThreshold})", data: data, duration: sw.Elapsed); } if (timeSinceUpdate >= _options.UnhealthyStalledThreshold && streamLength > consumerOffset) { return HealthCheckResult.Unhealthy( $"Subscription '{subscriptionName}' appears stalled: no progress for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.UnhealthyStalledThreshold.TotalMinutes} minutes)", data: data, duration: sw.Elapsed); } // Check for degraded conditions if (lag >= _options.DegradedConsumerLagThreshold) { return HealthCheckResult.Degraded( $"Subscription '{subscriptionName}' has elevated lag: {lag} events (threshold: {_options.DegradedConsumerLagThreshold})", data: data, duration: sw.Elapsed); } if (timeSinceUpdate >= _options.DegradedStalledThreshold && streamLength > consumerOffset) { return HealthCheckResult.Degraded( $"Subscription '{subscriptionName}' has slow progress: no updates for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.DegradedStalledThreshold.TotalMinutes} minutes)", data: data, duration: sw.Elapsed); } return HealthCheckResult.Healthy( $"Subscription '{subscriptionName}' is healthy (lag: {lag} events, last update: {timeSinceUpdate.TotalSeconds:F1}s ago)", data, sw.Elapsed); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (OperationCanceledException) { return HealthCheckResult.Unhealthy( $"Health check for subscription '{subscriptionName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s", duration: sw.Elapsed); } catch (Exception ex) { _logger?.LogError(ex, "Error checking health of subscription '{SubscriptionName}' on stream '{StreamName}'", subscriptionName, streamName); return HealthCheckResult.Unhealthy( $"Error checking subscription '{subscriptionName}': {ex.Message}", ex, duration: sw.Elapsed); } } public async Task> CheckAllStreamsAsync(CancellationToken cancellationToken = default) { var results = new ConcurrentDictionary(); var tasks = _streamConfigurations.Select(async stream => { var result = await CheckStreamHealthAsync(stream.StreamName, cancellationToken); results[stream.StreamName] = result; }); await Task.WhenAll(tasks); return results; } public async Task> CheckAllSubscriptionsAsync(CancellationToken cancellationToken = default) { var results = new ConcurrentDictionary(); var tasks = _subscriptions.Select(async subscription => { var result = await CheckSubscriptionHealthAsync(subscription.StreamName, subscription.SubscriptionId, cancellationToken); var key = $"{subscription.StreamName}:{subscription.SubscriptionId}"; results[key] = result; }); await Task.WhenAll(tasks); return results; } }