66 lines
1.5 KiB
Markdown
66 lines
1.5 KiB
Markdown
# Consumer Health
|
|
|
|
Monitor consumer lag, throughput, and stall detection.
|
|
|
|
## Overview
|
|
|
|
Consumer health monitoring tracks:
|
|
- Consumer lag (events behind)
|
|
- Processing rate (events/sec)
|
|
- Stall detection (no progress)
|
|
- Error rates
|
|
|
|
## Lag Detection
|
|
|
|
```csharp
|
|
public class ConsumerLagMonitor
|
|
{
|
|
public async Task<ConsumerLagResult> CheckLagAsync(
|
|
string streamName,
|
|
string consumerId,
|
|
CancellationToken ct)
|
|
{
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(consumerId, ct);
|
|
var streamHead = await _eventStore.GetStreamHeadAsync(streamName, ct);
|
|
var lag = streamHead - checkpoint;
|
|
|
|
return new ConsumerLagResult
|
|
{
|
|
ConsumerId = consumerId,
|
|
Checkpoint = checkpoint,
|
|
StreamHead = streamHead,
|
|
Lag = lag,
|
|
Status = lag switch
|
|
{
|
|
> 10000 => HealthStatus.Unhealthy,
|
|
> 1000 => HealthStatus.Degraded,
|
|
_ => HealthStatus.Healthy
|
|
}
|
|
};
|
|
}
|
|
}
|
|
```
|
|
|
|
## Stall Detection
|
|
|
|
```csharp
|
|
public class ConsumerStallDetector
|
|
{
|
|
public async Task<bool> IsConsumerStalledAsync(
|
|
string consumerId,
|
|
TimeSpan threshold,
|
|
CancellationToken ct)
|
|
{
|
|
var lastUpdate = await GetLastUpdateTimeAsync(consumerId, ct);
|
|
var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdate;
|
|
|
|
return timeSinceUpdate > threshold;
|
|
}
|
|
}
|
|
```
|
|
|
|
## See Also
|
|
|
|
- [Health Checks Overview](README.md)
|
|
- [Stream Health](stream-health.md)
|