dotnet-cqrs/docs/event-streaming/consumer-groups/fault-tolerance.md

332 lines
8.2 KiB
Markdown

# Fault Tolerance
Heartbeat monitoring, stale consumer cleanup, and recovery mechanisms.
## Overview
Consumer groups provide fault tolerance through heartbeat monitoring, automatic stale consumer cleanup, and offset-based recovery. These mechanisms ensure reliable event processing even when consumers fail.
## Heartbeat Monitoring
### How It Works
```
Consumer registers → Send heartbeat every 10s → Last heartbeat tracked in DB
If no heartbeat for 30s → Consumer marked as stale → Removed from group
```
### Configuration
```csharp
var options = new ConsumerGroupOptions
{
HeartbeatInterval = TimeSpan.FromSeconds(10), // Send heartbeat every 10s
SessionTimeout = TimeSpan.FromSeconds(30) // Mark stale after 30s
};
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
// Heartbeats sent automatically in background
await ProcessEventAsync(@event);
}
```
### Heartbeat Storage
```sql
SELECT * FROM consumer_registrations;
```
| stream_name | group_id | consumer_id | last_heartbeat | session_timeout_ms |
|-------------|----------|-------------|----------------|---------------------|
| orders | email-notif | worker-1 | 2025-12-10 10:30:00 | 30000 |
| orders | email-notif | worker-2 | 2025-12-10 10:29:45 | 30000 |
## Stale Consumer Cleanup
### Automatic Cleanup
Background service removes stale consumers:
```csharp
builder.Services.AddPostgresConsumerGroups(options =>
{
options.CleanupInterval = TimeSpan.FromMinutes(1); // Run cleanup every minute
});
```
### Cleanup Function
```sql
CREATE OR REPLACE FUNCTION cleanup_stale_consumers()
RETURNS INTEGER AS $$
DECLARE
deleted_count INTEGER;
BEGIN
DELETE FROM consumer_registrations
WHERE last_heartbeat < NOW() - (session_timeout_ms || ' milliseconds')::INTERVAL;
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;
```
### Manual Cleanup
```csharp
public class ConsumerCleanup
{
private readonly IConsumerOffsetStore _offsetStore;
public async Task RemoveStaleConsumersAsync()
{
var removed = await _offsetStore.CleanupStaleConsumersAsync();
_logger.LogInformation("Removed {Count} stale consumers", removed);
}
}
```
## Consumer Failure Recovery
### Scenario 1: Graceful Shutdown
Consumer stops cleanly:
```csharp
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await foreach (var @event in _consumerGroup.ConsumeAsync(..., stoppingToken))
{
await ProcessEventAsync(@event);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Shutting down gracefully");
// Consumer unregisters, offset committed
}
}
```
**Recovery:**
- Offset committed before shutdown
- Consumer unregistered from group
- No reprocessing needed
### Scenario 2: Unexpected Crash
Consumer crashes without cleanup:
```
T=0: Consumer processes events 1-500, commits offset 500
T=10: Consumer processes events 501-1000 (not committed)
T=15: Consumer crashes
T=45: Stale consumer removed (30s timeout)
T=46: New consumer starts, resumes from offset 501
```
**Recovery:**
- Events 501-1000 reprocessed
- Requires idempotent handlers
### Scenario 3: Network Partition
Consumer loses connectivity:
```
T=0: Consumer processing normally
T=10: Network partition (no heartbeats sent)
T=40: Consumer marked stale, removed from group
T=45: Network restored
T=46: Consumer attempts to commit offset → Fails (not registered)
T=47: Consumer re-registers and resumes
```
**Recovery:**
- Consumer re-registers automatically
- Resumes from last committed offset
- Some events may be reprocessed
## Idempotent Event Handlers
### Track Processed Events
```csharp
public class IdempotentOrderProcessor
{
private readonly IProcessedEventRepository _repository;
public async Task ProcessEventAsync(StoredEvent @event)
{
// Check if already processed
if (await _repository.IsProcessedAsync(@event.EventId))
{
_logger.LogInformation(
"Event {EventId} already processed, skipping",
@event.EventId);
return;
}
// Process event
await ProcessOrderAsync(@event);
// Mark as processed
await _repository.MarkProcessedAsync(@event.EventId, @event.Offset);
}
}
```
### Database Schema
```sql
CREATE TABLE processed_events (
event_id TEXT PRIMARY KEY,
offset BIGINT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_processed_events_offset ON processed_events(offset);
```
### With EF Core
```csharp
public class ProcessedEvent
{
public string EventId { get; set; } = string.Empty;
public long Offset { get; set; }
public DateTimeOffset ProcessedAt { get; set; }
}
public class ApplicationDbContext : DbContext
{
public DbSet<ProcessedEvent> ProcessedEvents { get; set; }
}
```
## Monitoring Stale Consumers
### Health Check
```csharp
public class ConsumerHealthCheck : IHealthCheck
{
private readonly IConsumerOffsetStore _offsetStore;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken)
{
var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");
var staleConsumers = consumers.Where(c =>
DateTimeOffset.UtcNow - c.LastHeartbeat > TimeSpan.FromMinutes(1));
if (staleConsumers.Any())
{
return HealthCheckResult.Degraded(
$"Stale consumers: {string.Join(", ", staleConsumers.Select(c => c.ConsumerId))}");
}
return HealthCheckResult.Healthy($"{consumers.Count} active consumers");
}
}
```
### Monitoring Service
```csharp
public class ConsumerMonitor : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");
foreach (var consumer in consumers)
{
var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;
if (timeSinceHeartbeat > TimeSpan.FromSeconds(20))
{
_logger.LogWarning(
"Consumer {ConsumerId} has not sent heartbeat for {Seconds}s",
consumer.ConsumerId,
timeSinceHeartbeat.TotalSeconds);
}
_metrics.RecordGauge(
"consumer.heartbeat.age",
timeSinceHeartbeat.TotalSeconds,
new[] { new KeyValuePair<string, object>("consumer_id", consumer.ConsumerId) });
}
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
```
## Split-Brain Prevention
### Consumer ID Uniqueness
Always use unique consumer IDs:
```csharp
// ✅ Good - Unique per instance
var consumerId = $"{Environment.MachineName}-{Process.GetCurrentProcess().Id}-{Guid.NewGuid():N}";
// ❌ Bad - Same ID across instances
var consumerId = "worker"; // Multiple instances will conflict
```
### Registration Check
```csharp
public async Task<bool> IsConsumerActiveAsync(string streamName, string groupId, string consumerId)
{
var consumers = await _offsetStore.GetConsumersAsync(streamName, groupId);
var consumer = consumers.FirstOrDefault(c => c.ConsumerId == consumerId);
if (consumer == null)
return false;
var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;
return timeSinceHeartbeat < consumer.SessionTimeout;
}
```
## Best Practices
### ✅ DO
- Use unique consumer IDs
- Implement idempotent event handlers
- Monitor stale consumers
- Set appropriate session timeouts (30-60s)
- Log heartbeat failures
- Test failure scenarios
### ❌ DON'T
- Don't reuse consumer IDs
- Don't ignore stale consumer warnings
- Don't set very short timeouts (< 10s)
- Don't skip idempotency checks
- Don't process without error handling
## See Also
- [Consumer Groups Overview](README.md)
- [Offset Management](offset-management.md)
- [Health Checks](../../observability/health-checks/consumer-health.md)
- [Best Practices](../../best-practices/README.md)