dotnet-cqrs/Svrnty.CQRS.Events/HealthCheck/StreamHealthCheck.cs

233 lines
9.8 KiB
C#

using System;
using Svrnty.CQRS.Events.HealthCheck;
using Svrnty.CQRS.Events.Abstractions.Models;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using Svrnty.CQRS.Events.Abstractions.Streaming;
using Svrnty.CQRS.Events.Subscriptions;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.HealthCheck;
/// <summary>
/// Implementation of stream health checks.
/// </summary>
public sealed class StreamHealthCheck : IStreamHealthCheck
{
private readonly IEnumerable<IStreamConfiguration> _streamConfigurations;
private readonly IEnumerable<Subscription> _subscriptions;
private readonly IEventStreamStore _streamStore;
private readonly ILogger<StreamHealthCheck>? _logger;
private readonly StreamHealthCheckOptions _options;
public StreamHealthCheck(
IEnumerable<IStreamConfiguration> streamConfigurations,
IEnumerable<Subscription> subscriptions,
IEventStreamStore streamStore,
IOptions<StreamHealthCheckOptions>? options = null,
ILogger<StreamHealthCheck>? logger = null)
{
_streamConfigurations = streamConfigurations ?? Enumerable.Empty<IStreamConfiguration>();
_subscriptions = subscriptions ?? Enumerable.Empty<Subscription>();
_streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore));
_logger = logger;
_options = options?.Value ?? new StreamHealthCheckOptions();
}
public async Task<HealthCheckResult> CheckStreamHealthAsync(string streamName, CancellationToken cancellationToken = default)
{
var sw = Stopwatch.StartNew();
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_options.HealthCheckTimeout);
var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName);
if (streamConfig == null)
{
return HealthCheckResult.Unhealthy(
$"Stream '{streamName}' does not exist",
duration: sw.Elapsed);
}
// Check if we can get the stream length (validates stream is readable)
var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token);
var data = new Dictionary<string, object>
{
["streamName"] = streamName,
["streamLength"] = streamLength,
["streamType"] = streamConfig.Type.ToString(),
["deliverySemantics"] = streamConfig.DeliverySemantics.ToString(),
["scope"] = streamConfig.Scope.ToString()
};
return HealthCheckResult.Healthy(
$"Stream '{streamName}' is healthy (length: {streamLength})",
data,
sw.Elapsed);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (OperationCanceledException)
{
return HealthCheckResult.Unhealthy(
$"Health check for stream '{streamName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s",
duration: sw.Elapsed);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error checking health of stream '{StreamName}'", streamName);
return HealthCheckResult.Unhealthy(
$"Error checking stream '{streamName}': {ex.Message}",
ex,
duration: sw.Elapsed);
}
}
public async Task<HealthCheckResult> CheckSubscriptionHealthAsync(string streamName, string subscriptionName, CancellationToken cancellationToken = default)
{
var sw = Stopwatch.StartNew();
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_options.HealthCheckTimeout);
var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName);
if (streamConfig == null)
{
return HealthCheckResult.Unhealthy(
$"Stream '{streamName}' does not exist",
duration: sw.Elapsed);
}
var subscription = _subscriptions.FirstOrDefault(s => s.StreamName == streamName && s.SubscriptionId == subscriptionName);
if (subscription == null)
{
return HealthCheckResult.Unhealthy(
$"Subscription '{subscriptionName}' does not exist on stream '{streamName}'",
duration: sw.Elapsed);
}
// Get stream length and consumer offset
var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token);
var consumerOffset = await _streamStore.GetConsumerOffsetAsync(streamName, subscriptionName, cts.Token);
// Calculate lag
var lag = streamLength - consumerOffset;
// Get last update time to detect stalled consumers
var lastUpdateTime = await _streamStore.GetConsumerLastUpdateTimeAsync(streamName, subscriptionName, cts.Token);
var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdateTime;
var data = new Dictionary<string, object>
{
["streamName"] = streamName,
["subscriptionName"] = subscriptionName,
["streamLength"] = streamLength,
["consumerOffset"] = consumerOffset,
["lag"] = lag,
["lastUpdateTime"] = lastUpdateTime,
["timeSinceUpdate"] = timeSinceUpdate.TotalSeconds,
["subscriptionMode"] = subscription.Mode.ToString()
};
// Check for unhealthy conditions
if (lag >= _options.UnhealthyConsumerLagThreshold)
{
return HealthCheckResult.Unhealthy(
$"Subscription '{subscriptionName}' has excessive lag: {lag} events (threshold: {_options.UnhealthyConsumerLagThreshold})",
data: data,
duration: sw.Elapsed);
}
if (timeSinceUpdate >= _options.UnhealthyStalledThreshold && streamLength > consumerOffset)
{
return HealthCheckResult.Unhealthy(
$"Subscription '{subscriptionName}' appears stalled: no progress for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.UnhealthyStalledThreshold.TotalMinutes} minutes)",
data: data,
duration: sw.Elapsed);
}
// Check for degraded conditions
if (lag >= _options.DegradedConsumerLagThreshold)
{
return HealthCheckResult.Degraded(
$"Subscription '{subscriptionName}' has elevated lag: {lag} events (threshold: {_options.DegradedConsumerLagThreshold})",
data: data,
duration: sw.Elapsed);
}
if (timeSinceUpdate >= _options.DegradedStalledThreshold && streamLength > consumerOffset)
{
return HealthCheckResult.Degraded(
$"Subscription '{subscriptionName}' has slow progress: no updates for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.DegradedStalledThreshold.TotalMinutes} minutes)",
data: data,
duration: sw.Elapsed);
}
return HealthCheckResult.Healthy(
$"Subscription '{subscriptionName}' is healthy (lag: {lag} events, last update: {timeSinceUpdate.TotalSeconds:F1}s ago)",
data,
sw.Elapsed);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (OperationCanceledException)
{
return HealthCheckResult.Unhealthy(
$"Health check for subscription '{subscriptionName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s",
duration: sw.Elapsed);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error checking health of subscription '{SubscriptionName}' on stream '{StreamName}'", subscriptionName, streamName);
return HealthCheckResult.Unhealthy(
$"Error checking subscription '{subscriptionName}': {ex.Message}",
ex,
duration: sw.Elapsed);
}
}
public async Task<IReadOnlyDictionary<string, HealthCheckResult>> CheckAllStreamsAsync(CancellationToken cancellationToken = default)
{
var results = new ConcurrentDictionary<string, HealthCheckResult>();
var tasks = _streamConfigurations.Select(async stream =>
{
var result = await CheckStreamHealthAsync(stream.StreamName, cancellationToken);
results[stream.StreamName] = result;
});
await Task.WhenAll(tasks);
return results;
}
public async Task<IReadOnlyDictionary<string, HealthCheckResult>> CheckAllSubscriptionsAsync(CancellationToken cancellationToken = default)
{
var results = new ConcurrentDictionary<string, HealthCheckResult>();
var tasks = _subscriptions.Select(async subscription =>
{
var result = await CheckSubscriptionHealthAsync(subscription.StreamName, subscription.SubscriptionId, cancellationToken);
var key = $"{subscription.StreamName}:{subscription.SubscriptionId}";
results[key] = result;
});
await Task.WhenAll(tasks);
return results;
}
}