233 lines
9.8 KiB
C#
233 lines
9.8 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.HealthCheck;
|
|
using Svrnty.CQRS.Events.Abstractions.Models;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using Svrnty.CQRS.Events.Abstractions.Streaming;
|
|
using Svrnty.CQRS.Events.Subscriptions;
|
|
using Svrnty.CQRS.Events.Abstractions.Configuration;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.HealthCheck;
|
|
|
|
/// <summary>
|
|
/// Implementation of stream health checks.
|
|
/// </summary>
|
|
public sealed class StreamHealthCheck : IStreamHealthCheck
|
|
{
|
|
private readonly IEnumerable<IStreamConfiguration> _streamConfigurations;
|
|
private readonly IEnumerable<Subscription> _subscriptions;
|
|
private readonly IEventStreamStore _streamStore;
|
|
private readonly ILogger<StreamHealthCheck>? _logger;
|
|
private readonly StreamHealthCheckOptions _options;
|
|
|
|
public StreamHealthCheck(
|
|
IEnumerable<IStreamConfiguration> streamConfigurations,
|
|
IEnumerable<Subscription> subscriptions,
|
|
IEventStreamStore streamStore,
|
|
IOptions<StreamHealthCheckOptions>? options = null,
|
|
ILogger<StreamHealthCheck>? logger = null)
|
|
{
|
|
_streamConfigurations = streamConfigurations ?? Enumerable.Empty<IStreamConfiguration>();
|
|
_subscriptions = subscriptions ?? Enumerable.Empty<Subscription>();
|
|
_streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore));
|
|
_logger = logger;
|
|
_options = options?.Value ?? new StreamHealthCheckOptions();
|
|
}
|
|
|
|
public async Task<HealthCheckResult> CheckStreamHealthAsync(string streamName, CancellationToken cancellationToken = default)
|
|
{
|
|
var sw = Stopwatch.StartNew();
|
|
try
|
|
{
|
|
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
cts.CancelAfter(_options.HealthCheckTimeout);
|
|
|
|
var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName);
|
|
if (streamConfig == null)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Stream '{streamName}' does not exist",
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
// Check if we can get the stream length (validates stream is readable)
|
|
var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token);
|
|
|
|
var data = new Dictionary<string, object>
|
|
{
|
|
["streamName"] = streamName,
|
|
["streamLength"] = streamLength,
|
|
["streamType"] = streamConfig.Type.ToString(),
|
|
["deliverySemantics"] = streamConfig.DeliverySemantics.ToString(),
|
|
["scope"] = streamConfig.Scope.ToString()
|
|
};
|
|
|
|
return HealthCheckResult.Healthy(
|
|
$"Stream '{streamName}' is healthy (length: {streamLength})",
|
|
data,
|
|
sw.Elapsed);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Health check for stream '{streamName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s",
|
|
duration: sw.Elapsed);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger?.LogError(ex, "Error checking health of stream '{StreamName}'", streamName);
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Error checking stream '{streamName}': {ex.Message}",
|
|
ex,
|
|
duration: sw.Elapsed);
|
|
}
|
|
}
|
|
|
|
public async Task<HealthCheckResult> CheckSubscriptionHealthAsync(string streamName, string subscriptionName, CancellationToken cancellationToken = default)
|
|
{
|
|
var sw = Stopwatch.StartNew();
|
|
try
|
|
{
|
|
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
cts.CancelAfter(_options.HealthCheckTimeout);
|
|
|
|
var streamConfig = _streamConfigurations.FirstOrDefault(s => s.StreamName == streamName);
|
|
if (streamConfig == null)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Stream '{streamName}' does not exist",
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
var subscription = _subscriptions.FirstOrDefault(s => s.StreamName == streamName && s.SubscriptionId == subscriptionName);
|
|
if (subscription == null)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Subscription '{subscriptionName}' does not exist on stream '{streamName}'",
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
// Get stream length and consumer offset
|
|
var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cts.Token);
|
|
var consumerOffset = await _streamStore.GetConsumerOffsetAsync(streamName, subscriptionName, cts.Token);
|
|
|
|
// Calculate lag
|
|
var lag = streamLength - consumerOffset;
|
|
|
|
// Get last update time to detect stalled consumers
|
|
var lastUpdateTime = await _streamStore.GetConsumerLastUpdateTimeAsync(streamName, subscriptionName, cts.Token);
|
|
var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdateTime;
|
|
|
|
var data = new Dictionary<string, object>
|
|
{
|
|
["streamName"] = streamName,
|
|
["subscriptionName"] = subscriptionName,
|
|
["streamLength"] = streamLength,
|
|
["consumerOffset"] = consumerOffset,
|
|
["lag"] = lag,
|
|
["lastUpdateTime"] = lastUpdateTime,
|
|
["timeSinceUpdate"] = timeSinceUpdate.TotalSeconds,
|
|
["subscriptionMode"] = subscription.Mode.ToString()
|
|
};
|
|
|
|
// Check for unhealthy conditions
|
|
if (lag >= _options.UnhealthyConsumerLagThreshold)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Subscription '{subscriptionName}' has excessive lag: {lag} events (threshold: {_options.UnhealthyConsumerLagThreshold})",
|
|
data: data,
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
if (timeSinceUpdate >= _options.UnhealthyStalledThreshold && streamLength > consumerOffset)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Subscription '{subscriptionName}' appears stalled: no progress for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.UnhealthyStalledThreshold.TotalMinutes} minutes)",
|
|
data: data,
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
// Check for degraded conditions
|
|
if (lag >= _options.DegradedConsumerLagThreshold)
|
|
{
|
|
return HealthCheckResult.Degraded(
|
|
$"Subscription '{subscriptionName}' has elevated lag: {lag} events (threshold: {_options.DegradedConsumerLagThreshold})",
|
|
data: data,
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
if (timeSinceUpdate >= _options.DegradedStalledThreshold && streamLength > consumerOffset)
|
|
{
|
|
return HealthCheckResult.Degraded(
|
|
$"Subscription '{subscriptionName}' has slow progress: no updates for {timeSinceUpdate.TotalMinutes:F1} minutes (threshold: {_options.DegradedStalledThreshold.TotalMinutes} minutes)",
|
|
data: data,
|
|
duration: sw.Elapsed);
|
|
}
|
|
|
|
return HealthCheckResult.Healthy(
|
|
$"Subscription '{subscriptionName}' is healthy (lag: {lag} events, last update: {timeSinceUpdate.TotalSeconds:F1}s ago)",
|
|
data,
|
|
sw.Elapsed);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Health check for subscription '{subscriptionName}' timed out after {_options.HealthCheckTimeout.TotalSeconds}s",
|
|
duration: sw.Elapsed);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger?.LogError(ex, "Error checking health of subscription '{SubscriptionName}' on stream '{StreamName}'", subscriptionName, streamName);
|
|
return HealthCheckResult.Unhealthy(
|
|
$"Error checking subscription '{subscriptionName}': {ex.Message}",
|
|
ex,
|
|
duration: sw.Elapsed);
|
|
}
|
|
}
|
|
|
|
public async Task<IReadOnlyDictionary<string, HealthCheckResult>> CheckAllStreamsAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
var results = new ConcurrentDictionary<string, HealthCheckResult>();
|
|
|
|
var tasks = _streamConfigurations.Select(async stream =>
|
|
{
|
|
var result = await CheckStreamHealthAsync(stream.StreamName, cancellationToken);
|
|
results[stream.StreamName] = result;
|
|
});
|
|
|
|
await Task.WhenAll(tasks);
|
|
return results;
|
|
}
|
|
|
|
public async Task<IReadOnlyDictionary<string, HealthCheckResult>> CheckAllSubscriptionsAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
var results = new ConcurrentDictionary<string, HealthCheckResult>();
|
|
|
|
var tasks = _subscriptions.Select(async subscription =>
|
|
{
|
|
var result = await CheckSubscriptionHealthAsync(subscription.StreamName, subscription.SubscriptionId, cancellationToken);
|
|
var key = $"{subscription.StreamName}:{subscription.SubscriptionId}";
|
|
results[key] = result;
|
|
});
|
|
|
|
await Task.WhenAll(tasks);
|
|
return results;
|
|
}
|
|
}
|