14 KiB
14 KiB
Health Checks
Monitor stream and subscription health with ASP.NET Core health checks.
Overview
Health checks provide visibility into system health:
- Stream Health - Detect unhealthy event streams
- Consumer Health - Monitor consumer lag and stalls
- Subscription Health - Track subscription status
- ASP.NET Core Integration - Built-in health check support
Key Features:
- ✅ Stream Monitoring - Check stream availability and status
- ✅ Consumer Lag Detection - Identify lagging consumers
- ✅ Stall Detection - Detect consumers with no progress
- ✅ Configurable Thresholds - Define degraded/unhealthy limits
- ✅ Health Check UI - Visual dashboard support
- ✅ Kubernetes Ready - /health endpoint for liveness/readiness probes
Quick Start
using Svrnty.CQRS.Events;
using Svrnty.CQRS.Events.Monitoring;
var builder = WebApplication.CreateBuilder(args);
// Register health checks
builder.Services.AddStreamHealthChecks(options =>
{
options.DegradedConsumerLagThreshold = 1000; // Warning at 1000 events
options.UnhealthyConsumerLagThreshold = 10000; // Error at 10000 events
options.DegradedStalledThreshold = TimeSpan.FromMinutes(5); // Warning after 5 min
options.UnhealthyStalledThreshold = TimeSpan.FromMinutes(15); // Error after 15 min
});
// Add to ASP.NET Core health checks
builder.Services.AddHealthChecks()
.AddCheck<StreamHealthCheck>("event-streams")
.AddCheck<ConsumerHealthCheck>("consumers");
var app = builder.Build();
// Map health check endpoint
app.MapHealthChecks("/health");
app.Run();
Health Check Components
Stream Health Check
Monitor overall stream health:
public class StreamHealthCheck : IHealthCheck
{
private readonly IStreamHealthService _healthService;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var result = await _healthService.CheckAllStreamsAsync(ct);
if (result.UnhealthyCount > 0)
{
return HealthCheckResult.Unhealthy(
$"{result.UnhealthyCount} unhealthy streams",
data: new Dictionary<string, object>
{
["healthy"] = result.HealthyCount,
["degraded"] = result.DegradedCount,
["unhealthy"] = result.UnhealthyCount
});
}
if (result.DegradedCount > 0)
{
return HealthCheckResult.Degraded(
$"{result.DegradedCount} degraded streams",
data: new Dictionary<string, object>
{
["healthy"] = result.HealthyCount,
["degraded"] = result.DegradedCount
});
}
return HealthCheckResult.Healthy(
$"{result.HealthyCount} healthy streams");
}
}
Consumer Health Check
Monitor consumer lag and stalls:
public class ConsumerHealthCheck : IHealthCheck
{
private readonly IConsumerHealthService _healthService;
private readonly HealthCheckOptions _options;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var consumers = await _healthService.GetAllConsumersAsync(ct);
var unhealthyConsumers = new List<string>();
var degradedConsumers = new List<string>();
foreach (var consumer in consumers)
{
var lag = consumer.Lag;
var timeSinceUpdate = DateTimeOffset.UtcNow - consumer.LastUpdated;
// Check for stalls
if (timeSinceUpdate > _options.UnhealthyStalledThreshold)
{
unhealthyConsumers.Add($"{consumer.ConsumerId} (stalled {timeSinceUpdate.TotalMinutes:F0}m)");
}
else if (timeSinceUpdate > _options.DegradedStalledThreshold)
{
degradedConsumers.Add($"{consumer.ConsumerId} (stalled {timeSinceUpdate.TotalMinutes:F0}m)");
}
// Check lag
else if (lag > _options.UnhealthyConsumerLagThreshold)
{
unhealthyConsumers.Add($"{consumer.ConsumerId} (lag {lag})");
}
else if (lag > _options.DegradedConsumerLagThreshold)
{
degradedConsumers.Add($"{consumer.ConsumerId} (lag {lag})");
}
}
if (unhealthyConsumers.Any())
{
return HealthCheckResult.Unhealthy(
$"{unhealthyConsumers.Count} unhealthy consumers",
data: new Dictionary<string, object>
{
["unhealthy_consumers"] = unhealthyConsumers,
["degraded_consumers"] = degradedConsumers
});
}
if (degradedConsumers.Any())
{
return HealthCheckResult.Degraded(
$"{degradedConsumers.Count} degraded consumers",
data: new Dictionary<string, object>
{
["degraded_consumers"] = degradedConsumers
});
}
return HealthCheckResult.Healthy($"{consumers.Count} healthy consumers");
}
}
Configuration Options
public class HealthCheckOptions
{
// Consumer lag thresholds (event count)
public long DegradedConsumerLagThreshold { get; set; } = 1000;
public long UnhealthyConsumerLagThreshold { get; set; } = 10000;
// Stall detection thresholds (time without progress)
public TimeSpan DegradedStalledThreshold { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan UnhealthyStalledThreshold { get; set; } = TimeSpan.FromMinutes(15);
// Stream health thresholds
public int MaxErrorRate { get; set; } = 5; // Errors per minute
public TimeSpan StreamUnresponsiveTimeout { get; set; } = TimeSpan.FromMinutes(5);
}
Health Check Endpoints
Basic Health Endpoint
app.MapHealthChecks("/health");
// Returns:
// 200 OK: Healthy
// 503 Service Unavailable: Degraded or Unhealthy
Detailed Health Endpoint
app.MapHealthChecks("/health/detail", new HealthCheckOptions
{
ResponseWriter = async (context, report) =>
{
context.Response.ContentType = "application/json";
var result = new
{
status = report.Status.ToString(),
totalDuration = report.TotalDuration.TotalMilliseconds,
checks = report.Entries.Select(e => new
{
name = e.Key,
status = e.Value.Status.ToString(),
duration = e.Value.Duration.TotalMilliseconds,
description = e.Value.Description,
data = e.Value.Data,
exception = e.Value.Exception?.Message
})
};
await context.Response.WriteAsJsonAsync(result);
}
});
// Returns detailed JSON response:
// {
// "status": "Healthy",
// "totalDuration": 45.2,
// "checks": [
// {
// "name": "event-streams",
// "status": "Healthy",
// "duration": 23.1,
// "description": "5 healthy streams",
// "data": { "healthy": 5, "degraded": 0, "unhealthy": 0 }
// }
// ]
// }
Liveness vs Readiness
// Liveness - is the app running?
app.MapHealthChecks("/health/live", new HealthCheckOptions
{
Predicate = check => check.Tags.Contains("live")
});
// Readiness - can the app serve traffic?
app.MapHealthChecks("/health/ready", new HealthCheckOptions
{
Predicate = check => check.Tags.Contains("ready")
});
// Registration with tags
builder.Services.AddHealthChecks()
.AddCheck<StreamHealthCheck>("event-streams", tags: new[] { "ready" })
.AddCheck<ConsumerHealthCheck>("consumers", tags: new[] { "ready" })
.AddCheck("self", () => HealthCheckResult.Healthy(), tags: new[] { "live" });
Kubernetes Integration
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-processor
spec:
template:
spec:
containers:
- name: event-processor
image: event-processor:latest
ports:
- containerPort: 80
livenessProbe:
httpGet:
path: /health/live
port: 80
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 80
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
Health Check UI
// Add Health Checks UI package
// dotnet add package AspNetCore.HealthChecks.UI
// dotnet add package AspNetCore.HealthChecks.UI.Client
// dotnet add package AspNetCore.HealthChecks.UI.InMemory.Storage
builder.Services.AddHealthChecks()
.AddCheck<StreamHealthCheck>("event-streams")
.AddCheck<ConsumerHealthCheck>("consumers");
builder.Services.AddHealthChecksUI()
.AddInMemoryStorage();
var app = builder.Build();
app.MapHealthChecks("/health", new HealthCheckOptions
{
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
});
app.MapHealthChecksUI(options =>
{
options.UIPath = "/health-ui";
});
// Access UI at: http://localhost:5000/health-ui
Custom Health Checks
Projection Health Check
public class ProjectionHealthCheck : IHealthCheck
{
private readonly ICheckpointStore _checkpointStore;
private readonly IEventStreamStore _eventStore;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var checkpoint = await _checkpointStore.GetCheckpointAsync("order-summary", ct);
var streamHead = await _eventStore.GetStreamHeadAsync("orders", ct);
var lag = streamHead - checkpoint;
if (lag > 10000)
{
return HealthCheckResult.Unhealthy(
$"Projection critically lagging: {lag} events behind",
data: new Dictionary<string, object>
{
["checkpoint"] = checkpoint,
["stream_head"] = streamHead,
["lag"] = lag
});
}
if (lag > 1000)
{
return HealthCheckResult.Degraded(
$"Projection lagging: {lag} events behind",
data: new Dictionary<string, object>
{
["checkpoint"] = checkpoint,
["stream_head"] = streamHead,
["lag"] = lag
});
}
return HealthCheckResult.Healthy($"Projection up-to-date (lag: {lag})");
}
}
Database Health Check
public class PostgresHealthCheck : IHealthCheck
{
private readonly IEventStreamStore _eventStore;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _eventStore.PingAsync(cts.Token);
return HealthCheckResult.Healthy("PostgreSQL connection healthy");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(
"PostgreSQL connection failed",
exception: ex);
}
}
}
Monitoring and Alerting
Prometheus Integration
// Export health check status as metrics
public class HealthCheckMetricsPublisher : IHealthCheckPublisher
{
private readonly IMetrics _metrics;
public Task PublishAsync(HealthReport report, CancellationToken ct)
{
foreach (var entry in report.Entries)
{
var status = entry.Value.Status switch
{
HealthStatus.Healthy => 1,
HealthStatus.Degraded => 0.5,
HealthStatus.Unhealthy => 0,
_ => 0
};
_metrics.RecordGauge($"health_check_{entry.Key}", status);
}
return Task.CompletedTask;
}
}
// Register publisher
builder.Services.AddSingleton<IHealthCheckPublisher, HealthCheckMetricsPublisher>();
Alert on Unhealthy
public class HealthCheckAlertPublisher : IHealthCheckPublisher
{
private readonly IAlertService _alertService;
public async Task PublishAsync(HealthReport report, CancellationToken ct)
{
if (report.Status == HealthStatus.Unhealthy)
{
await _alertService.SendAsync(new Alert
{
Severity = AlertSeverity.Critical,
Title = "System Unhealthy",
Description = $"Health check failed: {string.Join(", ", report.Entries.Where(e => e.Value.Status == HealthStatus.Unhealthy).Select(e => e.Key))}",
Timestamp = DateTimeOffset.UtcNow
});
}
}
}
Best Practices
✅ DO
- Configure health checks for production
- Use appropriate thresholds for your workload
- Separate liveness and readiness probes
- Monitor health check metrics
- Set up alerts for unhealthy status
- Include health checks in deployment strategy
- Test health check behavior
- Document health check meanings
❌ DON'T
- Don't use same thresholds for all systems
- Don't ignore degraded status
- Don't skip health checks in production
- Don't make health checks too slow (> 5s)
- Don't forget to handle timeouts
- Don't expose sensitive data in health responses
- Don't use health checks for business logic
- Don't forget to test failure scenarios