dotnet-cqrs/docs/observability/health-checks/consumer-health.md

1.5 KiB

Consumer Health

Monitor consumer lag, throughput, and stall detection.

Overview

Consumer health monitoring tracks:

  • Consumer lag (events behind)
  • Processing rate (events/sec)
  • Stall detection (no progress)
  • Error rates

Lag Detection

public class ConsumerLagMonitor
{
    public async Task<ConsumerLagResult> CheckLagAsync(
        string streamName,
        string consumerId,
        CancellationToken ct)
    {
        var checkpoint = await _checkpointStore.GetCheckpointAsync(consumerId, ct);
        var streamHead = await _eventStore.GetStreamHeadAsync(streamName, ct);
        var lag = streamHead - checkpoint;

        return new ConsumerLagResult
        {
            ConsumerId = consumerId,
            Checkpoint = checkpoint,
            StreamHead = streamHead,
            Lag = lag,
            Status = lag switch
            {
                > 10000 => HealthStatus.Unhealthy,
                > 1000 => HealthStatus.Degraded,
                _ => HealthStatus.Healthy
            }
        };
    }
}

Stall Detection

public class ConsumerStallDetector
{
    public async Task<bool> IsConsumerStalledAsync(
        string consumerId,
        TimeSpan threshold,
        CancellationToken ct)
    {
        var lastUpdate = await GetLastUpdateTimeAsync(consumerId, ct);
        var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdate;

        return timeSinceUpdate > threshold;
    }
}

See Also